# Metadata Indexing Implementation Plan

> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.

**Goal:** Add a SQLite metadata index that makes date/tag filtering O(log n) while preserving filesystem-as-source-of-truth.

**Architecture:** New `engram.index` module manages WAL-mode SQLite with optional FTS5. Librarian upserts during its existing loop. Server queries index for structured filters, passes whitelist to ripgrep. Missing/corrupt index degrades to full-vault ripgrep.

**Tech Stack:** Python `sqlite3` (stdlib), FTS5 (with LIKE fallback), Pydantic v2, pytest

**Spec:** `docs/superpowers/specs/2026-03-28-metadata-indexing-design.md`

---

## File Structure

| File | Responsibility |
|------|---------------|
| `src/engram/config.py` | Add `INDEX_PATH` constant |
| `src/engram/index.py` | **New.** SQLite connection, schema, upsert, query, reconcile |
| `src/engram/librarian.py` | Refactor `_process_file()` to return `ProcessResult`; integrate index upsert + reconcile |
| `src/engram/models.py` | Make `query` optional; add `tags`, `date_from`, `date_to`; add `model_validator` |
| `src/engram/server.py` | Query index for structured filters; handle metadata-only queries |
| `tests/test_index.py` | **New.** Index module tests |
| `tests/test_librarian.py` | Update for `ProcessResult` return type |
| `tests/test_server.py` | Update for optional `query` and new fields |
| `.gitignore` | Add `vault_index.sqlite` |

---

### Task 1: Add INDEX_PATH to config

**Files:**
- Modify: `src/engram/config.py:16` (after `LOG_DIR`)
- Modify: `.gitignore:38` (after `util/`)

- [ ] **Step 1: Add the constant**

In `src/engram/config.py`, after line 16 (`LOG_DIR`), add:

```python
INDEX_PATH: Path = ENGRAM_ROOT / "vault_index.sqlite"
```

- [ ] **Step 2: Add to .gitignore**

In `.gitignore`, after the `util/` line, add:

```
# Metadata index (derivative — rebuilt by engram-index)
vault_index.sqlite
vault_index.sqlite-wal
vault_index.sqlite-shm
```

- [ ] **Step 3: Verify**

Run: `python -c "from engram.config import INDEX_PATH; print(INDEX_PATH)"`
Expected: `/home/geodesix/engram/vault_index.sqlite`

- [ ] **Step 4: Commit**

```bash
git add src/engram/config.py .gitignore
git commit -m "config: add INDEX_PATH for metadata index"
```

---

### Task 2: Create index module — schema and open_index

**Files:**
- Create: `src/engram/index.py`
- Create: `tests/test_index.py`

- [ ] **Step 1: Write test for open_index creating fresh DB**

```python
"""Tests for the Engram metadata index."""

from __future__ import annotations

import sqlite3
from pathlib import Path

from engram.index import open_index


class TestOpenIndex:
    """open_index must create or open a SQLite index safely."""

    def test_creates_new_database(self, tmp_path: Path) -> None:
        db = tmp_path / "test.sqlite"
        conn = open_index(db)

        assert conn is not None
        # Verify schema exists
        cursor = conn.execute(
            "SELECT name FROM sqlite_master WHERE type='table' AND name='entities'"
        )
        assert cursor.fetchone() is not None
        conn.close()

    def test_reopens_existing_database(self, tmp_path: Path) -> None:
        db = tmp_path / "test.sqlite"
        conn1 = open_index(db)
        assert conn1 is not None
        conn1.execute(
            "INSERT INTO entities VALUES ('a.md', 1.0, '[]', 'test', '')"
        )
        conn1.commit()
        conn1.close()

        conn2 = open_index(db)
        assert conn2 is not None
        row = conn2.execute("SELECT path FROM entities").fetchone()
        assert row[0] == "a.md"
        conn2.close()

    def test_corrupt_file_returns_none(self, tmp_path: Path) -> None:
        db = tmp_path / "bad.sqlite"
        db.write_text("this is not a sqlite file")

        conn = open_index(db)
        assert conn is None
```

- [ ] **Step 2: Run tests to verify they fail**

Run: `pytest tests/test_index.py -v`
Expected: FAIL — `engram.index` does not exist

- [ ] **Step 3: Write minimal index.py with open_index and schema**

Create `src/engram/index.py`:

```python
"""Metadata index for structured queries (date, tags, session_id).

The index is a derivative cache — the filesystem remains the source of truth.
If the index is missing or corrupt, the system falls back to full-vault ripgrep.
"""

from __future__ import annotations

import json
import logging
import sqlite3
from pathlib import Path

logger = logging.getLogger(__name__)

_fts5_available: bool | None = None


def _has_fts5(conn: sqlite3.Connection) -> bool:
    """Probe whether FTS5 is compiled into this Python's SQLite."""
    global _fts5_available
    if _fts5_available is not None:
        return _fts5_available
    try:
        conn.execute("CREATE VIRTUAL TABLE _fts5_probe USING fts5(x)")
        conn.execute("DROP TABLE _fts5_probe")
        _fts5_available = True
    except sqlite3.OperationalError:
        _fts5_available = False
    return _fts5_available


def _create_schema(conn: sqlite3.Connection, use_fts5: bool) -> None:
    """Create tables and indexes if they don't exist."""
    conn.execute("""
        CREATE TABLE IF NOT EXISTS entities (
            path       TEXT PRIMARY KEY,
            mtime      REAL NOT NULL,
            topics     TEXT,
            summary    TEXT,
            created_at TEXT
        )
    """)
    conn.execute(
        "CREATE INDEX IF NOT EXISTS idx_created ON entities(created_at)"
    )
    conn.execute(
        "CREATE INDEX IF NOT EXISTS idx_mtime ON entities(mtime)"
    )
    if use_fts5:
        # Check if FTS table already exists
        row = conn.execute(
            "SELECT name FROM sqlite_master "
            "WHERE type='table' AND name='entities_fts'"
        ).fetchone()
        if row is None:
            conn.execute("""
                CREATE VIRTUAL TABLE entities_fts USING fts5(
                    path, topics, summary,
                    content='entities', content_rowid='rowid'
                )
            """)
    conn.commit()


def open_index(path: Path) -> sqlite3.Connection | None:
    """Open (or create) the metadata index. Returns None if corrupt."""
    try:
        conn = sqlite3.connect(str(path))
        conn.execute("PRAGMA journal_mode=WAL")
        conn.execute("PRAGMA synchronous=NORMAL")
        use_fts5 = _has_fts5(conn)
        _create_schema(conn, use_fts5)
        return conn
    except (sqlite3.DatabaseError, sqlite3.OperationalError):
        logger.warning("Index at %s is corrupt — falling back to ripgrep", path)
        return None
```

- [ ] **Step 4: Run tests to verify they pass**

Run: `pytest tests/test_index.py -v`
Expected: 3 PASSED

- [ ] **Step 5: Commit**

```bash
git add src/engram/index.py tests/test_index.py
git commit -m "feat: add index module with open_index and schema creation"
```

---

### Task 3: Add upsert_entity and query_index

**Files:**
- Modify: `src/engram/index.py`
- Modify: `tests/test_index.py`

- [ ] **Step 1: Write tests for upsert and query**

Add to `tests/test_index.py`:

```python
from engram.index import open_index, query_index, upsert_entity


class TestUpsertAndQuery:
    """upsert_entity + query_index must round-trip metadata correctly."""

    def test_upsert_and_query_by_tag(self, tmp_path: Path) -> None:
        db = tmp_path / "test.sqlite"
        conn = open_index(db)
        assert conn is not None

        upsert_entity(conn, "session_01.md", 1.0, ["Jetson", "Memory"], "test", "2026-03-15")
        conn.commit()

        results = query_index(conn, tags=["Jetson"], date_from=None, date_to=None)
        assert results == ["session_01.md"]
        conn.close()

    def test_query_no_match(self, tmp_path: Path) -> None:
        db = tmp_path / "test.sqlite"
        conn = open_index(db)
        assert conn is not None

        upsert_entity(conn, "session_01.md", 1.0, ["Jetson"], "test", "2026-03-15")
        conn.commit()

        results = query_index(conn, tags=["Cron"], date_from=None, date_to=None)
        assert results == []
        conn.close()

    def test_empty_index_returns_empty(self, tmp_path: Path) -> None:
        db = tmp_path / "test.sqlite"
        conn = open_index(db)
        assert conn is not None

        results = query_index(conn, tags=["Jetson"], date_from=None, date_to=None)
        assert results == []
        conn.close()

    def test_date_range_filtering(self, tmp_path: Path) -> None:
        db = tmp_path / "test.sqlite"
        conn = open_index(db)
        assert conn is not None

        upsert_entity(conn, "old.md", 1.0, ["Memory"], "old", "2026-01-01")
        upsert_entity(conn, "new.md", 2.0, ["Memory"], "new", "2026-03-15")
        upsert_entity(conn, "nodate.md", 3.0, ["Memory"], "none", "")
        conn.commit()

        results = query_index(conn, tags=None, date_from="2026-03-01", date_to=None)
        assert results == ["new.md"]

        results = query_index(conn, tags=None, date_from=None, date_to="2026-02-01")
        assert results == ["old.md"]
        conn.close()

    def test_upsert_is_idempotent(self, tmp_path: Path) -> None:
        db = tmp_path / "test.sqlite"
        conn = open_index(db)
        assert conn is not None

        upsert_entity(conn, "a.md", 1.0, ["Jetson"], "v1", "2026-03-15")
        upsert_entity(conn, "a.md", 2.0, ["Jetson", "Memory"], "v2", "2026-03-15")
        conn.commit()

        count = conn.execute("SELECT COUNT(*) FROM entities").fetchone()[0]
        assert count == 1

        row = conn.execute("SELECT summary FROM entities WHERE path='a.md'").fetchone()
        assert row[0] == "v2"
        conn.close()
```

- [ ] **Step 2: Run tests to verify they fail**

Run: `pytest tests/test_index.py::TestUpsertAndQuery -v`
Expected: FAIL — `upsert_entity` and `query_index` not defined

- [ ] **Step 3: Implement upsert_entity and query_index**

Add to `src/engram/index.py`:

```python
def upsert_entity(
    conn: sqlite3.Connection,
    rel_path: str,
    mtime: float,
    topics: list[str],
    summary: str,
    created_at: str,
) -> None:
    """Insert or update one entity in the index."""
    topics_json = json.dumps(topics)

    if _fts5_available:
        # Content-sync: delete stale FTS row first
        conn.execute("DELETE FROM entities_fts WHERE path = ?", (rel_path,))

    conn.execute(
        "INSERT OR REPLACE INTO entities (path, mtime, topics, summary, created_at) "
        "VALUES (?, ?, ?, ?, ?)",
        (rel_path, mtime, topics_json, summary, created_at),
    )

    if _fts5_available:
        conn.execute(
            "INSERT INTO entities_fts(path, topics, summary) "
            "SELECT path, topics, summary FROM entities WHERE path = ?",
            (rel_path,),
        )


def query_index(
    conn: sqlite3.Connection,
    tags: list[str] | None = None,
    date_from: str | None = None,
    date_to: str | None = None,
) -> list[str]:
    """Return relative paths matching the given structured filters."""
    conditions: list[str] = []
    params: list[str] = []

    if tags:
        if _fts5_available:
            # FTS5 MATCH query — OR across tags (quoted to handle special chars)
            fts_expr = " OR ".join(f'"{tag}"' for tag in tags)
            conditions.append(
                "path IN (SELECT path FROM entities_fts WHERE topics MATCH ?)"
            )
            params.append(fts_expr)
        else:
            # LIKE fallback — match any tag
            tag_clauses = []
            for tag in tags:
                tag_clauses.append("topics LIKE ?")
                params.append(f"%{tag}%")
            conditions.append(f"({' OR '.join(tag_clauses)})")

    if date_from:
        conditions.append("created_at != '' AND created_at >= ?")
        params.append(date_from)

    if date_to:
        conditions.append("created_at != '' AND created_at <= ?")
        params.append(date_to)

    if not conditions:
        return list(
            row[0] for row in conn.execute("SELECT path FROM entities").fetchall()
        )

    sql = "SELECT path FROM entities WHERE " + " AND ".join(conditions)
    return list(row[0] for row in conn.execute(sql, params).fetchall())
```

- [ ] **Step 4: Run tests to verify they pass**

Run: `pytest tests/test_index.py -v`
Expected: 8 PASSED (3 from Task 2 + 5 new)

- [ ] **Step 5: Commit**

```bash
git add src/engram/index.py tests/test_index.py
git commit -m "feat: add upsert_entity and query_index with FTS5/LIKE support"
```

---

### Task 4: Add reconcile function

**Files:**
- Modify: `src/engram/index.py`
- Modify: `tests/test_index.py`

- [ ] **Step 1: Write test for reconciliation**

Add to `tests/test_index.py`:

```python
from engram.index import open_index, reconcile, upsert_entity


class TestReconcile:
    """reconcile must purge index rows for deleted files."""

    def test_removes_orphaned_entries(self, tmp_path: Path) -> None:
        entities = tmp_path / "entities"
        entities.mkdir()
        (entities / "alive.md").write_text("# Alive")
        # "dead.md" does NOT exist on disk

        db = tmp_path / "test.sqlite"
        conn = open_index(db)
        assert conn is not None

        upsert_entity(conn, "alive.md", 1.0, ["Memory"], "alive", "")
        upsert_entity(conn, "dead.md", 1.0, ["Jetson"], "dead", "")
        conn.commit()

        removed = reconcile(conn, entities)

        assert removed == 1
        paths = [r[0] for r in conn.execute("SELECT path FROM entities").fetchall()]
        assert "alive.md" in paths
        assert "dead.md" not in paths
        conn.close()

    def test_no_orphans_returns_zero(self, tmp_path: Path) -> None:
        entities = tmp_path / "entities"
        entities.mkdir()
        (entities / "a.md").write_text("# A")

        db = tmp_path / "test.sqlite"
        conn = open_index(db)
        assert conn is not None

        upsert_entity(conn, "a.md", 1.0, [], "test", "")
        conn.commit()

        assert reconcile(conn, entities) == 0
        conn.close()
```

- [ ] **Step 2: Run tests to verify they fail**

Run: `pytest tests/test_index.py::TestReconcile -v`
Expected: FAIL — `reconcile` not defined

- [ ] **Step 3: Implement reconcile**

Add to `src/engram/index.py`:

```python
def reconcile(conn: sqlite3.Connection, entities_root: Path) -> int:
    """Purge index entries whose files no longer exist on disk.

    Returns the number of orphaned rows removed.
    """
    cursor = conn.execute("SELECT path FROM entities")
    orphans = [
        rel_path
        for (rel_path,) in cursor
        if not (entities_root / rel_path).exists()
    ]
    if orphans:
        for rel_path in orphans:
            if _fts5_available:
                conn.execute(
                    "DELETE FROM entities_fts WHERE path = ?", (rel_path,)
                )
            conn.execute("DELETE FROM entities WHERE path = ?", (rel_path,))
        conn.commit()
        logger.info("Reconciled %d orphaned index entries", len(orphans))
    return len(orphans)
```

- [ ] **Step 4: Run tests to verify they pass**

Run: `pytest tests/test_index.py -v`
Expected: 10 PASSED

- [ ] **Step 5: Commit**

```bash
git add src/engram/index.py tests/test_index.py
git commit -m "feat: add reconcile to purge orphaned index entries"
```

---

### Task 5: Add graceful degradation test

**Files:**
- Modify: `tests/test_index.py`

- [ ] **Step 1: Write test for missing index file**

Add to `tests/test_index.py`:

```python
class TestGracefulDegradation:
    """System must survive missing or corrupt index."""

    def test_missing_file_creates_new(self, tmp_path: Path) -> None:
        db = tmp_path / "nonexistent.sqlite"
        conn = open_index(db)
        assert conn is not None
        conn.close()

    def test_corrupt_file_returns_none(self, tmp_path: Path) -> None:
        """Already tested in TestOpenIndex — verify it's still passing."""
        db = tmp_path / "bad.sqlite"
        db.write_text("not a database")
        assert open_index(db) is None
```

- [ ] **Step 2: Run tests**

Run: `pytest tests/test_index.py::TestGracefulDegradation -v`
Expected: 2 PASSED

- [ ] **Step 3: Commit**

```bash
git add tests/test_index.py
git commit -m "test: add graceful degradation tests for index module"
```

---

### Task 6: Refactor _process_file to return ProcessResult

**Files:**
- Modify: `src/engram/librarian.py:65-124`
- Modify: `tests/test_librarian.py:73-141`

- [ ] **Step 1: Update tests to expect ProcessResult**

In `tests/test_librarian.py`, update the import line and test assertions:

```python
from engram.librarian import (
    ProcessResult,
    _load_checkpoint,
    _process_file,
    _quarantine,
    _save_checkpoint,
    _scan_entities,
)
```

Update `TestProcessFile` methods:

```python
def test_tags_bare_file(self, tmp_path: Path) -> None:
    f = tmp_path / "session.md"
    f.write_text("# Session\nDiscussing memory and engram vault.\n")
    quarantine = tmp_path / "corrupted"

    result = _process_file(f, tmp_path, quarantine, _noop_logger())

    assert result is not None
    assert isinstance(result, ProcessResult)
    assert "Memory" in result.topics_delta
    assert "Memory" in result.topics
    content = f.read_text()
    assert content.startswith("---\n")

def test_preserves_existing_frontmatter(self, tmp_path: Path) -> None:
    f = tmp_path / "existing.md"
    f.write_text(
        "---\nsummary: Important session.\ntopics: []\n---\n"
        "# Body\njetson nano board setup\n"
    )
    quarantine = tmp_path / "corrupted"

    result = _process_file(f, tmp_path, quarantine, _noop_logger())

    assert result is not None
    assert result.summary == "Important session."
    assert "Jetson" in result.topics
    content = f.read_text()
    fm = yaml.safe_load(content.split("---\n")[1])
    assert fm["summary"] == "Important session."
    assert "Jetson" in fm["topics"]

def test_malformed_yaml_survives(self, tmp_path: Path) -> None:
    """Corrupted YAML frontmatter must not crash — replace it."""
    f = tmp_path / "bad_yaml.md"
    f.write_text("---\ntitle: bad: yaml: here\n---\nBody.\n")
    quarantine = tmp_path / "corrupted"

    result = _process_file(f, tmp_path, quarantine, _noop_logger())

    assert result is not None
    assert isinstance(result, ProcessResult)
    content = f.read_text()
    assert content.startswith("---\n")
    assert "Body." in content

def test_no_keyword_match(self, tmp_path: Path) -> None:
    """Files with no keyword matches get empty topics list."""
    f = tmp_path / "plain.md"
    f.write_text("# Meeting Notes\nDiscussed quarterly targets.\n")
    quarantine = tmp_path / "corrupted"

    result = _process_file(f, tmp_path, quarantine, _noop_logger())

    assert result is not None
    assert result.topics_delta == {}
    assert result.topics == []
```

- [ ] **Step 2: Run tests to verify they fail**

Run: `pytest tests/test_librarian.py::TestProcessFile -v`
Expected: FAIL — `ProcessResult` not importable

- [ ] **Step 3: Refactor _process_file in librarian.py**

Add `import dataclasses` at the top of `src/engram/librarian.py`. Add the dataclass after the imports:

```python
@dataclasses.dataclass
class ProcessResult:
    """Result of processing a single entity file."""
    topics_delta: dict[str, int]
    topics: list[str]
    summary: str
    created_at: str
```

Change `_process_file` return type from `dict[str, int] | None` to `ProcessResult | None`.

**Specific edits to `src/engram/librarian.py`:**

1. **Line 75:** Delete `topics_delta: dict[str, int] = {}` (moved into the return block).

2. **Line 101 (after `fm.setdefault("related_ids", [])`):** Add:
```python
    fm.setdefault("created_at", "")
```

3. **Lines 121-124:** Replace the old return block:
```python
    # OLD (delete these lines):
    for t in fm["topics"]:
        topics_delta[t] = topics_delta.get(t, 0) + 1
    return topics_delta
```
With:
```python
    # NEW:
    topics_delta: dict[str, int] = {}
    for t in fm["topics"]:
        topics_delta[t] = topics_delta.get(t, 0) + 1

    return ProcessResult(
        topics_delta=topics_delta,
        topics=fm["topics"],
        summary=fm.get("summary", ""),
        created_at=fm.get("created_at", ""),
    )
```

The rest of `_process_file` (keyword tagging, write-back) remains unchanged.

Update `main()` where it consumes the result (around line 225):

```python
        if result is None:
            quarantined += 1
        else:
            processed.add(rel)
            for topic, count in result.topics_delta.items():
                topics_map[topic] = topics_map.get(topic, 0) + count
```

- [ ] **Step 4: Run all tests**

Run: `pytest tests/ -v --tb=short`
Expected: All tests PASS (52 existing, all green)

- [ ] **Step 5: Commit**

```bash
git add src/engram/librarian.py tests/test_librarian.py
git commit -m "refactor: _process_file returns ProcessResult dataclass"
```

---

### Task 7: Integrate index into Librarian

**Files:**
- Modify: `src/engram/librarian.py` (main function)

- [ ] **Step 1: Add index integration to main()**

In `src/engram/librarian.py`, add import at top:

```python
from engram.config import ENTITIES_DIR, INDEX_PATH, KEYWORD_TOPICS, LOG_DIR
from engram.index import open_index, reconcile, upsert_entity
```

In `main()`, after `ENTITIES_DIR.mkdir(exist_ok=True)` and the checkpoint logic, add:

```python
    # --- open metadata index ------------------------------------------------
    if args.full_index:
        # Delete stale index for clean rebuild
        if INDEX_PATH.exists():
            INDEX_PATH.unlink()
    idx_conn = open_index(INDEX_PATH)
    if idx_conn is None:
        logger.warning("Metadata index unavailable — indexing without it")
```

In the processing loop, after `processed.add(rel)` and the topic aggregation, add:

```python
            if idx_conn is not None:
                upsert_entity(
                    idx_conn, rel, filepath.stat().st_mtime,
                    result.topics, result.summary, result.created_at,
                )
```

In the periodic checkpoint block, after `_save_checkpoint(...)`, add:

```python
            if idx_conn is not None:
                idx_conn.commit()
```

Before the final flush section, add reconciliation:

```python
    # --- reconcile orphaned index entries -----------------------------------
    if idx_conn is not None:
        orphaned = reconcile(idx_conn, ENTITIES_DIR)
        if orphaned:
            logger.info("Reconciled %d orphaned index entries", orphaned)
```

At the end of `main()`, before `return 0`, add:

```python
    if idx_conn is not None:
        idx_conn.commit()
        idx_conn.close()
```

- [ ] **Step 2: Run all tests**

Run: `pytest tests/ -v --tb=short`
Expected: All PASS

- [ ] **Step 3: Commit**

```bash
git add src/engram/librarian.py
git commit -m "feat: integrate metadata index into librarian ingestion loop"
```

---

### Task 8: Update Pydantic models

**Files:**
- Modify: `src/engram/models.py`
- Modify: `tests/test_server.py`

- [ ] **Step 1: Update tests for new model behavior**

In `tests/test_server.py`, update `TestSessionSearchInput`:

```python
def test_empty_query_rejected(self) -> None:
    """Empty query with no other filters must fail."""
    with pytest.raises(ValidationError, match="At least one"):
        SessionSearchInput(query="")

def test_tags_only_accepted(self) -> None:
    """Metadata-only query (tags, no text) must be valid."""
    m = SessionSearchInput(query="", tags=["Jetson"])
    assert m.tags == ["Jetson"]
    assert m.query == ""

def test_date_range_accepted(self) -> None:
    m = SessionSearchInput(query="", date_from="2026-03-01")
    assert m.date_from == "2026-03-01"

def test_invalid_date_rejected(self) -> None:
    with pytest.raises(ValidationError, match="date"):
        SessionSearchInput(query="test", date_from="not-a-date")

def test_schema_generation(self) -> None:
    schema = SessionSearchInput.model_json_schema()
    assert schema["properties"]["query"]["maxLength"] == MAX_QUERY_LENGTH
    assert "tags" in schema["properties"]
    assert "date_from" in schema["properties"]
    assert "session_id" in schema["properties"]
```

Also update `TestToolSchema.test_schema_uses_pydantic`:

```python
@pytest.mark.asyncio
async def test_schema_uses_pydantic(self, server: EngramServer) -> None:
    """Tool schema must be generated from the Pydantic model."""
    result = await server.list_tools(MagicMock())
    schema = result.tools[0].inputSchema
    assert schema["properties"]["query"]["maxLength"] == MAX_QUERY_LENGTH
    assert "tags" in schema["properties"]
```

Update `TestInputValidation.test_empty_query_rejected`:

```python
@pytest.mark.asyncio
async def test_empty_query_rejected(self, server: EngramServer) -> None:
    req = _make_request(query="")
    result = await server.call_tool(req)
    assert "Validation error" in result.content[0].text
```

Update `TestInputValidation.test_missing_query_field`:

```python
@pytest.mark.asyncio
async def test_missing_query_field(self, server: EngramServer) -> None:
    """Omitting query with no other filters must fail validation."""
    req = _make_request()
    result = await server.call_tool(req)
    assert "Validation error" in result.content[0].text
```

- [ ] **Step 2: Run tests to verify they fail**

Run: `pytest tests/test_server.py::TestSessionSearchInput -v`
Expected: FAIL — new methods don't match current model

- [ ] **Step 3: Update models.py**

Rewrite `src/engram/models.py`:

```python
"""Pydantic models for MCP tool input validation."""

from __future__ import annotations

import re

from pydantic import BaseModel, Field, field_validator, model_validator

from engram.config import MAX_QUERY_LENGTH

_SESSION_ID_RE = re.compile(r"^[A-Za-z0-9_.-]+$")
_DATE_RE = re.compile(r"^\d{4}-\d{2}-\d{2}$")


class SessionSearchInput(BaseModel):
    """Validated input for the ``get_session_context`` MCP tool."""

    query: str = Field(
        default="",
        max_length=MAX_QUERY_LENGTH,
        description="Search term (regex OK). Empty with tags/dates = metadata-only.",
    )
    session_id: str | None = Field(
        default=None,
        description="Filename prefix to scope the search",
    )
    tags: list[str] | None = Field(
        default=None,
        description="Filter by topic tags",
    )
    date_from: str | None = Field(
        default=None,
        description="ISO-8601 date lower bound (YYYY-MM-DD)",
    )
    date_to: str | None = Field(
        default=None,
        description="ISO-8601 date upper bound (YYYY-MM-DD)",
    )

    @field_validator("session_id")
    @classmethod
    def session_id_safe(cls, v: str | None) -> str | None:
        if v is not None and not _SESSION_ID_RE.match(v):
            raise ValueError(
                "session_id must contain only alphanumeric characters, "
                "dots, hyphens, and underscores"
            )
        return v

    @field_validator("date_from", "date_to")
    @classmethod
    def date_format_valid(cls, v: str | None) -> str | None:
        if v is not None and not _DATE_RE.match(v):
            raise ValueError("date must be in YYYY-MM-DD format")
        return v

    @model_validator(mode="after")
    def at_least_one_filter(self) -> SessionSearchInput:
        if (
            not self.query
            and not self.tags
            and not self.date_from
            and not self.date_to
            and not self.session_id
        ):
            raise ValueError(
                "At least one of query, tags, date_from, date_to, "
                "or session_id is required"
            )
        return self
```

- [ ] **Step 4: Run all tests**

Run: `pytest tests/ -v --tb=short`
Expected: All PASS

- [ ] **Step 5: Commit**

```bash
git add src/engram/models.py tests/test_server.py
git commit -m "feat: make query optional, add tags/date_from/date_to fields"
```

---

### Task 9: Update server to use metadata index

**Files:**
- Modify: `src/engram/server.py`

- [ ] **Step 1: Add structured query handling to server.py**

Update imports:

```python
import sqlite3

from engram.config import ENTITIES_DIR, INDEX_PATH, MAX_RESPONSE_CHARS
from engram.index import open_index, query_index
```

Add index connection and metadata-only handler in `call_tool`:

```python
    async def call_tool(self, request: CallToolRequest) -> CallToolResult:
        if request.name != "get_session_context":
            return CallToolResult(
                content=[TextContent(type="text", text="Unknown tool")]
            )

        # --- Pydantic validation ------------------------------------------------
        try:
            params = SessionSearchInput(**request.arguments)
        except ValidationError as exc:
            return CallToolResult(
                content=[
                    TextContent(type="text", text=f"Validation error: {exc}")
                ]
            )

        # --- structured filter: query the index ---------------------------------
        candidate_paths: list[str] | None = None
        has_filters = params.tags or params.date_from or params.date_to

        if has_filters and INDEX_PATH.exists():
            idx_conn = open_index(INDEX_PATH)
            if idx_conn is not None:
                try:
                    candidates = query_index(
                        idx_conn, params.tags, params.date_from, params.date_to,
                    )
                    # Query-time existence check (stale-cache guard)
                    candidate_paths = [
                        p for p in candidates
                        if (ENTITIES_DIR / p).exists()
                    ]
                finally:
                    idx_conn.close()

        # --- metadata-only query (no text search) --------------------------------
        if has_filters and not params.query:
            if candidate_paths is None:
                return CallToolResult(
                    content=[TextContent(
                        type="text",
                        text="Metadata index unavailable. "
                        "Provide a text query or run engram-index first.",
                    )]
                )
            if not candidate_paths:
                return CallToolResult(
                    content=[TextContent(type="text", text="No matching sessions.")]
                )
            # Read frontmatter + first paragraph from each match
            output_parts: list[str] = []
            total_chars = 0
            for rel_path in candidate_paths:
                filepath = ENTITIES_DIR / rel_path
                try:
                    text = filepath.read_text(errors="replace")
                    # Take first 500 chars (frontmatter + opening)
                    snippet = text[:500]
                    if total_chars + len(snippet) > MAX_RESPONSE_CHARS:
                        break
                    output_parts.append(f"--- {rel_path} ---\n{snippet}\n")
                    total_chars += len(snippet) + len(rel_path) + 10
                except OSError:
                    continue
            return CallToolResult(
                content=[TextContent(type="text", text="".join(output_parts))]
            )

        # --- build ripgrep command ----------------------------------------------
        cmd: list[str] = ["rg", "--no-heading", "--with-filename"]

        if params.session_id:
            cmd.extend(["--glob", f"*{params.session_id}*"])

        # Narrow search to index candidates if available
        if candidate_paths is not None:
            if not candidate_paths:
                return CallToolResult(
                    content=[TextContent(
                        type="text",
                        text=f"No match for '{params.query}' (no files match filters).",
                    )]
                )
            # Pass candidate files as explicit arguments (not --glob,
            # which matches filenames only and fails for subdirectory paths)
            cmd.append(params.query)
            cmd.extend(str(ENTITIES_DIR / p) for p in candidate_paths)
        else:
            cmd.extend([params.query, str(ENTITIES_DIR)])

        # --- execute asynchronously with timing ---------------------------------
        t0 = time.perf_counter()
        try:
            proc = await asyncio.create_subprocess_exec(
                *cmd,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE,
            )
            stdout, stderr = await proc.communicate()
        except FileNotFoundError:
            return CallToolResult(
                content=[
                    TextContent(
                        type="text",
                        text="ripgrep (rg) is not installed or not on PATH.",
                    )
                ]
            )

        elapsed_ms = (time.perf_counter() - t0) * 1000
        logger.info(
            "tool=get_session_context query=%r session_id=%r "
            "tags=%r date_from=%r date_to=%r "
            "candidates=%s returncode=%d elapsed_ms=%.1f",
            params.query,
            params.session_id,
            params.tags,
            params.date_from,
            params.date_to,
            len(candidate_paths) if candidate_paths is not None else "all",
            proc.returncode,
            elapsed_ms,
        )

        if proc.returncode == 0:
            text = stdout.decode(errors="replace")[:MAX_RESPONSE_CHARS]
            content = TextContent(type="text", text=text)
        else:
            err = stderr.decode(errors="replace").strip()
            content = TextContent(
                type="text", text=f"No match for '{params.query}'. {err}"
            )

        return CallToolResult(content=[content])
```

- [ ] **Step 2: Run all tests**

Run: `pytest tests/ -v --tb=short`
Expected: All PASS

- [ ] **Step 3: Run linters**

Run: `ruff check src/ tests/ && mypy src/engram/`
Expected: All checks passed

- [ ] **Step 4: Commit**

```bash
git add src/engram/server.py
git commit -m "feat: server queries metadata index for structured filters"
```

---

### Task 10: Final validation and CI check

**Files:** None (verification only)

- [ ] **Step 1: Run full test suite**

Run: `pytest tests/ -v --tb=short`
Expected: All PASS (52 existing + ~12 new = ~64 tests), total time < 1s

- [ ] **Step 2: Run linters**

Run: `ruff check src/ tests/ && mypy src/engram/`
Expected: Zero findings

- [ ] **Step 3: Verify graceful degradation (no index file)**

Run: `python -c "from engram.index import open_index; print(open_index('/tmp/nonexistent.sqlite'))"`
Expected: prints a connection object (creates fresh DB)

Run: `python -c "from pathlib import Path; Path('/tmp/bad.sqlite').write_text('corrupt'); from engram.index import open_index; print(open_index(Path('/tmp/bad.sqlite')))"`
Expected: prints `None`

- [ ] **Step 4: Commit all remaining changes**

```bash
git add -A
git commit -m "feat: Phase 3 metadata indexing — SQLite cache with FTS5 and graceful degradation"
```
