Source code for telix.rooms

"""
Room graph tracking, BFS pathfinding, and SQLite persistence for GMCP Room.Info data.

Incrementally builds a directed graph from GMCP ``Room.Info`` messages,
supports shortest-path search via BFS, and persists per-session room
data to ``~/.local/share/telix/rooms-{host}_{port}.db``.
"""

import os
import re
import json
import random
import typing
import hashlib
import sqlite3
import datetime
import collections
import dataclasses

from . import paths

EXIT_DIR_RE = re.compile(
    r"\s*"
    r"(?:\{[^}]*\}\s*)?"
    r"\[(?:[nswe]|n[ew]|s[ew]|[a-z]+)"
    r"(?:,(?:[nswe]|n[ew]|s[ew]|[a-z]+))*\]"
    r"\s*$"
)


ROOM_ID_KEYS = ("num", "vnum", "id")


def room_id(info: dict[str, typing.Any]) -> str | None:
    """
    Extract the room identifier from a GMCP ``Room.Info`` payload.

    Checks ``num``, ``vnum``, and ``id`` in priority order.  When none
    of these keys are present, a 12-character SHA-1 hash is generated
    from the room name and full exit data (direction + destination name
    pairs) to produce a stable synthetic identifier.

    :param info: GMCP Room.Info dict.
    :returns: Room identifier as a string, or ``None`` if no key is found.
    """
    for key in ROOM_ID_KEYS:
        if key in info:
            return str(info[key])
    name = info.get("name")
    if name is None:
        return None
    exits = info.get("exits")
    if isinstance(exits, dict) and exits:
        pairs = ",".join(f"{k}:{exits[k]}" for k in sorted(exits))
        fingerprint = f"{name}|{pairs}"
    else:
        fingerprint = str(name)
    return hashlib.sha1(fingerprint.encode("utf-8")).hexdigest()[:12]


def strip_exit_dirs(name: str) -> str:
    """
    Strip trailing exit-direction lists like ``[n,s,w,e]`` from a room name.

    Also handles optional ``{SPICE}``-style tags before the bracket list.
    """
    return EXIT_DIR_RE.sub("", name)


[docs] @dataclasses.dataclass class Room: """ A single room in the GMCP room graph. Accessible in scripts as ``ctx.room`` (current room) or via :meth:`~telix.rooms.RoomStore.get_room`. """ num: str name: str = "" area: str = "" environment: str = "" exits: dict[str, str] = dataclasses.field(default_factory=dict) bookmarked: bool = False visit_count: int = 0 last_visited: str = "" blocked: bool = False home: bool = False marked: bool = False
[docs] class RoomStore: """ SQLite-backed room graph with in-memory adjacency cache. Accessible in scripts as ``ctx.room_graph``. """ def __init__(self, db_path: str, read_only: bool = False, session_key: str = "") -> None: """ Open or create an SQLite room database. :param db_path: Path to the ``.db`` file. :param read_only: Open in read-only mode (no table creation). :param session_key: ``host:port`` identifier stored as metadata. """ dir_path = os.path.dirname(db_path) if dir_path: os.makedirs(dir_path, exist_ok=True) mode = "ro" if read_only else "rwc" uri = f"file:{db_path}?mode={mode}" self.conn = sqlite3.connect(uri, uri=True) self.conn.execute("PRAGMA journal_mode=WAL") self.conn.execute("PRAGMA synchronous=NORMAL") self.conn.execute("PRAGMA temp_store=MEMORY") self.conn.execute("PRAGMA cache_size=-4000") # 4 MB self.conn.execute("PRAGMA mmap_size=8388608") # 8 MB if not read_only: self.create_tables() if session_key: self.conn.execute("INSERT OR REPLACE INTO meta VALUES ('session_key', ?)", (session_key,)) self.conn.commit() self.adj: dict[str, dict[str, str]] = {} self.load_adjacency() def create_tables(self) -> None: """Create schema tables if they do not exist.""" self.conn.executescript(""" CREATE TABLE IF NOT EXISTS room ( num TEXT PRIMARY KEY, name TEXT NOT NULL DEFAULT '', area TEXT NOT NULL DEFAULT '', environment TEXT NOT NULL DEFAULT '', bookmarked INTEGER NOT NULL DEFAULT 0, visit_count INTEGER NOT NULL DEFAULT 0, last_visited TEXT NOT NULL DEFAULT '' ); CREATE TABLE IF NOT EXISTS exit ( src_num TEXT NOT NULL, direction TEXT NOT NULL, dst_num TEXT NOT NULL, PRIMARY KEY (src_num, direction) ); CREATE TABLE IF NOT EXISTS meta ( key TEXT PRIMARY KEY, value TEXT NOT NULL ); """) for col in ("blocked", "home", "marked"): try: self.conn.execute(f"ALTER TABLE room ADD COLUMN {col} INTEGER NOT NULL DEFAULT 0") except sqlite3.OperationalError: pass self.conn.execute("INSERT OR IGNORE INTO meta VALUES ('version', '1')") self.conn.commit() def load_adjacency(self) -> None: """Load full adjacency graph into memory for BFS.""" self.adj.clear() try: for src, direction, dst in self.conn.execute("SELECT src_num, direction, dst_num FROM exit"): self.adj.setdefault(src, {})[direction] = dst except sqlite3.OperationalError: pass def close(self) -> None: """Close the database connection.""" self.conn.close() def row_to_room(self, row: tuple[typing.Any, ...]) -> Room: """Convert a SELECT row to a :class:`Room`.""" num = row[0] exits = dict(self.adj.get(num, {})) return Room( num=num, name=row[1], area=row[2], environment=row[3], exits=exits, bookmarked=bool(row[4]), visit_count=row[5], last_visited=row[6], blocked=bool(row[7]), home=bool(row[8]), marked=bool(row[9]), ) ROOM_COLS = "num, name, area, environment, bookmarked, visit_count, last_visited, blocked, home, marked" @property def rooms(self) -> dict[str, Room]: """ Return all rooms as a dict (for compatibility). Not cached. """ result: dict[str, Room] = {} for row in self.conn.execute(f"SELECT {self.ROOM_COLS} FROM room"): room = self.row_to_room(row) result[room.num] = room return result def room_summaries(self) -> list[tuple[str, str, str, int, bool, str, bool, bool, bool]]: """ Return lightweight room summary tuples. Each tuple is ``(num, name, area, exit_count, bookmarked, last_visited, blocked, home, marked)``. Counts exits via SQL aggregation instead of materialising :class:`Room` objects, which avoids copying exit dicts for every room. """ rows = self.conn.execute( "SELECT r.num, r.name, r.area, COUNT(e.direction)," " r.bookmarked, r.last_visited, r.blocked, r.home, r.marked" " FROM room r LEFT JOIN exit e ON r.num = e.src_num" " GROUP BY r.num" ).fetchall() return [(r[0], r[1], r[2], r[3], bool(r[4]), r[5], bool(r[6]), bool(r[7]), bool(r[8])) for r in rows] def room_area(self, num: str) -> str: """Return the area of a single room, or ``""`` if not found.""" row = self.conn.execute("SELECT area FROM room WHERE num = ?", (num,)).fetchone() return row[0] if row else ""
[docs] def get_room(self, num: str) -> Room | None: """ Get a single room by number. :param num: Room number. :returns: :class:`Room` or ``None`` if not found. """ row = self.conn.execute(f"SELECT {self.ROOM_COLS} FROM room WHERE num = ?", (num,)).fetchone() if row is None: return None return self.row_to_room(row)
[docs] def has_room(self, num: str) -> bool: """Return ``True`` if room *num* exists in the database.""" row = self.conn.execute("SELECT 1 FROM room WHERE num = ?", (num,)).fetchone() return row is not None
def update_room(self, info: dict[str, typing.Any]) -> None: """ Update or create a room from a GMCP ``Room.Info`` payload. :param info: GMCP Room.Info dict with a room identifier key. """ num = room_id(info) or "" exits = info.get("exits", {}) if isinstance(exits, dict): exits = {str(k): str(v) for k, v in exits.items() if v} else: exits = {} name = strip_exit_dirs(str(info.get("name", ""))) area = str(info.get("area", "")) environment = str(info.get("environment", "")) now = datetime.datetime.now(datetime.timezone.utc).isoformat() self.conn.execute( "INSERT INTO room" " (num, name, area, environment, visit_count, last_visited)" " VALUES (?, ?, ?, ?, 1, ?)" " ON CONFLICT(num) DO UPDATE SET" " name=excluded.name, area=excluded.area," " environment=excluded.environment," " visit_count=visit_count+1," " last_visited=excluded.last_visited", (num, name, area, environment, now), ) self.conn.execute("DELETE FROM exit WHERE src_num = ?", (num,)) if exits: self.conn.executemany( "INSERT INTO exit (src_num, direction, dst_num) VALUES (?, ?, ?)", [(num, d, dst) for d, dst in exits.items()], ) self.conn.commit() self.adj[num] = exits MARKER_COLS = ("bookmarked", "blocked", "home", "marked") def set_marker(self, num: str, marker: str) -> bool: """ Toggle a marker on a room, clearing all other markers. Markers are mutually exclusive: only one of ``bookmarked``, ``blocked``, ``home``, ``marked`` can be set at a time. If the requested marker is already set, all markers are cleared. For ``home``, the one-per-area constraint is also enforced. :param num: Room number. :param marker: One of ``"bookmarked"``, ``"blocked"``, ``"home"``, ``"marked"``. :returns: New state of *marker*, or ``False`` if room not found. """ if marker not in self.MARKER_COLS: raise ValueError(f"unknown marker: {marker!r}") row = self.conn.execute(f"SELECT {marker}, area FROM room WHERE num = ?", (num,)).fetchone() if row is None: return False new_state = not bool(row[0]) self.conn.execute("UPDATE room SET bookmarked=0, blocked=0, home=0, marked=0 WHERE num = ?", (num,)) if new_state: self.conn.execute(f"UPDATE room SET {marker} = 1 WHERE num = ?", (num,)) if marker == "home": area = row[1] self.conn.execute("UPDATE room SET home = 0 WHERE area = ? AND num != ?", (area, num)) self.conn.commit() return new_state
[docs] def toggle_bookmark(self, num: str) -> bool: """Toggle bookmark on a room (exclusive with other markers).""" return self.set_marker(num, "bookmarked")
[docs] def toggle_blocked(self, num: str) -> bool: """Toggle blocked state on a room (exclusive with other markers).""" return self.set_marker(num, "blocked")
[docs] def toggle_home(self, num: str) -> bool: """Toggle home state on a room (exclusive with other markers).""" return self.set_marker(num, "home")
[docs] def toggle_marked(self, num: str) -> bool: """Toggle mark on a room (exclusive with other markers).""" return self.set_marker(num, "marked")
[docs] def get_home_for_area(self, area: str) -> str | None: """ Return the home room number for the given area. :param area: Area name. :returns: Room number string, or ``None`` if no home is set. """ row = self.conn.execute("SELECT num FROM room WHERE area = ? AND home = 1", (area,)).fetchone() return row[0] if row else None
[docs] def blocked_rooms(self) -> frozenset[str]: """Return frozenset of all blocked room numbers.""" return frozenset(row[0] for row in self.conn.execute("SELECT num FROM room WHERE blocked = 1"))
[docs] def room_nums(self) -> frozenset[str]: """Return the set of all room numbers in the database.""" return frozenset(row[0] for row in self.conn.execute("SELECT num FROM room"))
def bfs_distances(self, src: str, blocked: frozenset[str] = frozenset()) -> dict[str, int]: """ BFS from *src* returning distance to every reachable room. :param src: Source room number. :param blocked: Room numbers to treat as impassable. :returns: ``{room_num: distance}`` for all reachable rooms. """ known = self.room_nums() if src not in known: return {} distances: dict[str, int] = {src: 0} queue: collections.deque[str] = collections.deque([src]) while queue: current = queue.popleft() d = distances[current] for target in self.adj.get(current, {}).values(): if target not in distances and target in known and target not in blocked: distances[target] = d + 1 queue.append(target) return distances
[docs] def find_path(self, src: str, dst: str, blocked: frozenset[str] = frozenset()) -> list[str] | None: """ BFS shortest path from *src* to *dst*. :param blocked: Room numbers to treat as impassable. :returns: List of direction names, or ``None`` if unreachable. """ if src == dst: return [] if not self.has_room(src): return None visited: set[str] = {src} queue: collections.deque[tuple[str, list[str]]] = collections.deque([(src, [])]) while queue: current, path = queue.popleft() for direction, target in self.adj.get(current, {}).items(): if target == dst: return path + [direction] if target not in visited and self.has_room(target) and target not in blocked: visited.add(target) queue.append((target, path + [direction])) return None
def find_path_with_rooms( self, src: str, dst: str, blocked: frozenset[str] = frozenset() ) -> list[tuple[str, str]] | None: """ BFS shortest path returning ``[(direction, target_room_num), ...]``. :param blocked: Room numbers to treat as impassable. :returns: List of (direction, expected_room_num) pairs, or ``None``. """ if src == dst: return [] if not self.has_room(src): return None visited: set[str] = {src} queue: collections.deque[tuple[str, list[tuple[str, str]]]] = collections.deque([(src, [])]) while queue: current, path = queue.popleft() for direction, target in self.adj.get(current, {}).items(): if target == dst: return path + [(direction, target)] if target not in visited and self.has_room(target) and target not in blocked: visited.add(target) queue.append((target, path + [(direction, target)])) return None
[docs] def find_same_name(self, num: str, limit: int = 99) -> list[Room]: """ Find rooms with the same name as *num*, sorted by least-recently-visited. :param num: Room number to match name against. :param limit: Maximum results to return. :returns: List of matching rooms, excluding *num* itself. """ row = self.conn.execute("SELECT name FROM room WHERE num = ?", (num,)).fetchone() if row is None or not row[0]: return [] target_name = row[0] rows = self.conn.execute( f"SELECT {self.ROOM_COLS} FROM room WHERE name = ? AND num != ? ORDER BY last_visited ASC LIMIT ?", (target_name, num, limit), ).fetchall() return [self.row_to_room(r) for r in rows]
def find_branches( self, src: str, limit: int = 99, blocked: frozenset[str] = frozenset(), strategy: str = "bfs" ) -> list[tuple[str, str, str]]: """ Find exits from known rooms leading to unvisited or unknown rooms. :param src: Source room number to search from. :param limit: Maximum number of branches to return. :param blocked: Room numbers to treat as impassable. :param strategy: ``"bfs"`` for nearest-first, ``"dfs"`` for deepest-first ordering. :returns: ``[(gateway_room_num, direction, target_num), ...]`` sorted by BFS distance from *src*. """ if not self.has_room(src): return [] visited: set[str] = {src} queue: collections.deque[tuple[str, int]] = collections.deque([(src, 0)]) branches: list[tuple[int, str, str, str]] = [] while queue: current, dist = queue.popleft() for direction, target in self.adj.get(current, {}).items(): if target in blocked: continue target_vc = self.conn.execute("SELECT visit_count FROM room WHERE num = ?", (target,)).fetchone() if target_vc is None or target_vc[0] == 0: branches.append((dist, current, direction, target)) elif target not in visited: visited.add(target) queue.append((target, dist + 1)) reverse = strategy == "dfs" branches.sort(key=lambda b: b[0], reverse=reverse) shuffled: list[tuple[int, str, str, str]] = [] i = 0 while i < len(branches): dist = branches[i][0] j = i while j < len(branches) and branches[j][0] == dist: j += 1 tier = branches[i:j] random.shuffle(tier) shuffled.extend(tier) i = j return [(gw, d, t) for _, gw, d, t in shuffled[:limit]]
[docs] def search(self, query: str) -> list[Room]: """ Case-insensitive substring search on room name, area, and ID. :param query: Search string. :returns: Matching rooms sorted bookmarked-first, then by name. """ q = f"%{query}%" rows = self.conn.execute( f"SELECT {self.ROOM_COLS}" " FROM room WHERE name LIKE ? COLLATE NOCASE" " OR area LIKE ? COLLATE NOCASE" " OR num LIKE ? COLLATE NOCASE", (q, q, q), ).fetchall() results = [self.row_to_room(r) for r in rows] results.sort(key=lambda r: (not r.bookmarked, r.name.lower())) return results
RoomGraph = RoomStore def xdg_data_dir() -> str: """Return XDG data directory for telix.""" return paths.DATA_DIR def session_file_path(prefix: str, session_key: str, ext: str = "") -> str: """Return a per-session file path under the XDG data directory.""" return os.path.join(xdg_data_dir(), f"{prefix}{paths.safe_session_slug(session_key)}{ext}") def rooms_path(session_key: str) -> str: """Return path to room graph SQLite DB for *session_key* (``host:port``).""" return session_file_path("rooms-", session_key, ".db") def current_room_path(session_key: str) -> str: """Return path to current room number file for *session_key*.""" return session_file_path(".current-room-", session_key) def fasttravel_path(session_key: str) -> str: """Return path to fast travel command file for *session_key*.""" return session_file_path(".fasttravel-", session_key) def prefs_path(session_key: str) -> str: """Return path to preferences JSON for *session_key* (``host:port``).""" return session_file_path("prefs-", session_key, ".json") def load_prefs(session_key: str) -> dict[str, bool | str]: """ Load per-session preferences from disk. :param session_key: Session identifier (``host:port``). :returns: Dict of preference values (booleans and strings). """ path = prefs_path(session_key) try: with open(path, encoding="utf-8") as f: data = json.load(f) if isinstance(data, dict): result: dict[str, bool | str] = {} for k, v in data.items(): if isinstance(v, str): result[str(k)] = v else: result[str(k)] = bool(v) return result except (OSError, ValueError): pass return {} def save_prefs(session_key: str, prefs: dict[str, bool | str]) -> None: """ Atomically save per-session preferences to disk. :param session_key: Session identifier (``host:port``). :param prefs: Dict of preference values (booleans and strings). """ path = prefs_path(session_key) paths.atomic_write(path, json.dumps(prefs, separators=(",", ":"))) def write_current_room(path: str, room_num: str) -> None: """ Write the current room number to a small file for TUI subprocess. :param path: File path. :param room_num: Current room number string. """ paths.atomic_write(path, room_num) def read_current_room(path: str) -> str: """ Read the current room number from disk. :param path: File path written by :func:`write_current_room`. :returns: Room number string, or empty string if unavailable. """ try: with open(path, encoding="utf-8") as f: return f.read().strip() except (OSError, ValueError): return "" def write_fasttravel(path: str, steps: list[tuple[str, str]], noreply: bool = False) -> None: """ Write fast travel steps to disk for the REPL to read. :param path: File path. :param steps: List of (direction, expected_room_num) pairs. :param noreply: If ``True``, disable triggers during travel. """ data = {"steps": steps, "noreply": noreply} paths.atomic_write(path, json.dumps(data)) def read_fasttravel(path: str) -> tuple[list[tuple[str, str]], bool]: """ Read and delete fast travel steps from disk. :param path: File path written by :func:`write_fasttravel`. :returns: Tuple of ``(steps, noreply)`` where *steps* is a list of ``(direction, expected_room_num)`` pairs and *noreply* indicates whether triggers should be disabled. """ try: with open(path, encoding="utf-8") as f: data = json.load(f) os.unlink(path) if isinstance(data, dict): steps = [(str(d), str(r)) for d, r in data.get("steps", [])] noreply = bool(data.get("noreply", False)) return steps, noreply return [(str(d), str(r)) for d, r in data], False except (OSError, ValueError): return [], False