Skip to content

Motion Database

database

Motion database: storage, similarity, retrieval.

Classes

MotionDatabase

Store trajectories as invariants; optional FAISS index for fast retrieval. add(trajectory, metadata) -> encode and store retrieve(query_trajectory, k) -> top-k similar (by L2 on invariants or DTW).

Source code in src/dhb_xr/database/motion_db.py
class MotionDatabase:
    """
    Store trajectories as invariants; optional FAISS index for fast retrieval.
    add(trajectory, metadata) -> encode and store
    retrieve(query_trajectory, k) -> top-k similar (by L2 on invariants or DTW).
    """

    def __init__(self, dhb_method: str = "double_reflection", use_faiss: bool = False):
        self.dhb_method = DHBMethod.DOUBLE_REFLECTION if dhb_method == "double_reflection" else DHBMethod.ORIGINAL
        self.k_inv = 4 if self.dhb_method == DHBMethod.DOUBLE_REFLECTION else 3
        self.invariants_list: List[np.ndarray] = []
        self.metadata_list: List[Dict[str, Any]] = []
        self.use_faiss = use_faiss and HAS_FAISS
        self.index: Optional[Any] = None
        self.embedding_dim: Optional[int] = None

    def add(
        self,
        positions: np.ndarray,
        quaternions: np.ndarray,
        metadata: Optional[Dict[str, Any]] = None,
    ) -> None:
        out = encode_dhb_dr(
            positions, quaternions,
            method=EncodingMethod.POSITION, use_default_initial_frames=True, dhb_method=self.dhb_method,
        )
        U = np.concatenate([
            out["linear_motion_invariants"],
            out["angular_motion_invariants"],
        ], axis=1)
        self.invariants_list.append(U.astype(np.float32))
        self.metadata_list.append(metadata or {})
        if self.use_faiss and U.size > 0:
            flat = U.flatten()
            if self.embedding_dim is None:
                self.embedding_dim = flat.shape[0]
                self.index = faiss.IndexFlatL2(self.embedding_dim)
            emb = flat.reshape(1, -1).astype(np.float32)
            if emb.shape[1] == self.embedding_dim:
                self.index.add(emb)

    def retrieve(
        self,
        query_positions: np.ndarray,
        query_quaternions: np.ndarray,
        k: int = 10,
        use_dtw: bool = False,
    ) -> List[tuple]:
        out = encode_dhb_dr(
            query_positions, query_quaternions,
            method=EncodingMethod.POSITION, use_default_initial_frames=True, dhb_method=self.dhb_method,
        )
        U_q = np.concatenate([
            out["linear_motion_invariants"],
            out["angular_motion_invariants"],
        ], axis=1)
        if use_dtw:
            dists = [soft_dtw_distance(U_q, U) for U in self.invariants_list]
            idx = np.argsort(dists)[:k]
            return [(self.invariants_list[i], self.metadata_list[i], dists[i]) for i in idx]

        # Use FAISS index when available and embeddings match
        if self.use_faiss and self.index is not None and self.embedding_dim is not None:
            flat = U_q.flatten().astype(np.float32)
            if flat.shape[0] == self.embedding_dim:
                q_emb = flat.reshape(1, -1)
                k_actual = min(k, len(self.invariants_list))
                distances, indices = self.index.search(q_emb, k_actual)
                return [
                    (self.invariants_list[indices[0, j]], self.metadata_list[indices[0, j]], float(distances[0, j]))
                    for j in range(k_actual)
                    if indices[0, j] >= 0
                ]

        # Brute-force fallback
        dists = [invariant_distance(U_q, U) for U in self.invariants_list]
        idx = np.argsort(dists)[:k]
        return [(self.invariants_list[i], self.metadata_list[i], dists[i]) for i in idx]

    def save(self, path: str) -> None:
        """Persist the database to disk.  Writes ``<path>.npz`` (invariants) and ``<path>.json`` (metadata)."""
        arrs = {f"inv_{i}": inv for i, inv in enumerate(self.invariants_list)}
        np.savez(path + ".npz", **arrs)
        meta = {
            "dhb_method": "double_reflection" if self.dhb_method == DHBMethod.DOUBLE_REFLECTION else "original",
            "metadata_list": self.metadata_list,
        }
        with open(path + ".json", "w") as f:
            json.dump(meta, f)

    @classmethod
    def load(cls, path: str) -> "MotionDatabase":
        """Load a database from ``<path>.npz`` and ``<path>.json``."""
        with open(path + ".json", "r") as f:
            meta = json.load(f)
        db = cls(dhb_method=meta["dhb_method"], use_faiss=False)
        db.metadata_list = meta["metadata_list"]
        data = np.load(path + ".npz")
        n = len(db.metadata_list)
        db.invariants_list = [data[f"inv_{i}"].astype(np.float32) for i in range(n)]
        return db
Functions
load classmethod
load(path)

Load a database from <path>.npz and <path>.json.

Source code in src/dhb_xr/database/motion_db.py
@classmethod
def load(cls, path: str) -> "MotionDatabase":
    """Load a database from ``<path>.npz`` and ``<path>.json``."""
    with open(path + ".json", "r") as f:
        meta = json.load(f)
    db = cls(dhb_method=meta["dhb_method"], use_faiss=False)
    db.metadata_list = meta["metadata_list"]
    data = np.load(path + ".npz")
    n = len(db.metadata_list)
    db.invariants_list = [data[f"inv_{i}"].astype(np.float32) for i in range(n)]
    return db
save
save(path)

Persist the database to disk. Writes <path>.npz (invariants) and <path>.json (metadata).

Source code in src/dhb_xr/database/motion_db.py
def save(self, path: str) -> None:
    """Persist the database to disk.  Writes ``<path>.npz`` (invariants) and ``<path>.json`` (metadata)."""
    arrs = {f"inv_{i}": inv for i, inv in enumerate(self.invariants_list)}
    np.savez(path + ".npz", **arrs)
    meta = {
        "dhb_method": "double_reflection" if self.dhb_method == DHBMethod.DOUBLE_REFLECTION else "original",
        "metadata_list": self.metadata_list,
    }
    with open(path + ".json", "w") as f:
        json.dump(meta, f)

Functions

invariant_distance

invariant_distance(U, V, weights=None, use_quat_geodesic=False, quat_start_idx=1, quat_len=4)

Aligned L2 distance between invariant sequences. U, V: (N, 2*k) or (N, k_lin + k_ang). If use_quat_geodesic, angular quaternion part uses geodesic.

Source code in src/dhb_xr/database/similarity.py
def invariant_distance(
    U: np.ndarray,
    V: np.ndarray,
    weights: Optional[np.ndarray] = None,
    use_quat_geodesic: bool = False,
    quat_start_idx: int = 1,
    quat_len: int = 4,
) -> float:
    """
    Aligned L2 distance between invariant sequences.
    U, V: (N, 2*k) or (N, k_lin + k_ang). If use_quat_geodesic, angular quaternion part uses geodesic.
    """
    U = np.asarray(U, dtype=np.float64)
    V = np.asarray(V, dtype=np.float64)
    if U.ndim != 2 or V.ndim != 2:
        raise ValueError("U and V must be 2-D arrays (N, k)")
    if U.shape[1] != V.shape[1]:
        raise ValueError(f"Feature dimension mismatch: {U.shape[1]} vs {V.shape[1]}")
    # Interpolate shorter sequence to match the longer one
    if U.shape[0] != V.shape[0]:
        target_len = max(U.shape[0], V.shape[0])
        if U.shape[0] != target_len:
            U = _interpolate_sequence(U, target_len)
        if V.shape[0] != target_len:
            V = _interpolate_sequence(V, target_len)
    if weights is None:
        weights = np.ones(U.shape[1])
    weights = np.asarray(weights).reshape(-1)
    diff = U - V
    if use_quat_geodesic:
        # Assume linear then angular; each component [m, ...]. Angular has quat at [quat_start_idx:quat_start_idx+quat_len]
        half = U.shape[1] // 2
        for i in range(U.shape[0]):
            for j in [0, 1]:
                base = j * half
                if base + quat_start_idx + quat_len <= U.shape[1]:
                    q1 = U[i, base + quat_start_idx : base + quat_start_idx + quat_len]
                    q2 = V[i, base + quat_start_idx : base + quat_start_idx + quat_len]
                    d = quaternion_geodesic_distance(q1, q2)
                    diff[i, base + quat_start_idx] = d
        dist = np.sqrt(np.sum(weights * (diff ** 2)))
    else:
        dist = np.sqrt(np.sum(weights * (diff ** 2)))
    return float(dist)

soft_dtw_distance

soft_dtw_distance(U, V, gamma=1.0)

Soft-DTW (differentiable) between two sequences.

Uses dtaidistance if available, otherwise a pure-numpy soft-DTW implementation with the gamma smoothing parameter. As gamma -> 0 this converges to hard DTW.

Source code in src/dhb_xr/database/similarity.py
def soft_dtw_distance(
    U: np.ndarray,
    V: np.ndarray,
    gamma: float = 1.0,
) -> float:
    """Soft-DTW (differentiable) between two sequences.

    Uses dtaidistance if available, otherwise a pure-numpy soft-DTW
    implementation with the gamma smoothing parameter.  As gamma -> 0
    this converges to hard DTW.
    """
    U = np.asarray(U, dtype=np.float64)
    V = np.asarray(V, dtype=np.float64)
    n, m = len(U), len(V)
    D = np.full((n + 1, m + 1), np.inf, dtype=np.float64)
    D[0, 0] = 0.0
    for i in range(1, n + 1):
        for j in range(1, m + 1):
            d_ij = float(np.sum((U[i - 1] - V[j - 1]) ** 2))
            D[i, j] = d_ij + _soft_min(D[i - 1, j], D[i, j - 1], D[i - 1, j - 1], gamma)
    return float(D[n, m])

Overview

Store and retrieve trajectories using invariant-based similarity search.

Main Classes

MotionDatabase

MotionDatabase

Store trajectories as invariants; optional FAISS index for fast retrieval. add(trajectory, metadata) -> encode and store retrieve(query_trajectory, k) -> top-k similar (by L2 on invariants or DTW).

Source code in src/dhb_xr/database/motion_db.py
class MotionDatabase:
    """
    Store trajectories as invariants; optional FAISS index for fast retrieval.
    add(trajectory, metadata) -> encode and store
    retrieve(query_trajectory, k) -> top-k similar (by L2 on invariants or DTW).
    """

    def __init__(self, dhb_method: str = "double_reflection", use_faiss: bool = False):
        self.dhb_method = DHBMethod.DOUBLE_REFLECTION if dhb_method == "double_reflection" else DHBMethod.ORIGINAL
        self.k_inv = 4 if self.dhb_method == DHBMethod.DOUBLE_REFLECTION else 3
        self.invariants_list: List[np.ndarray] = []
        self.metadata_list: List[Dict[str, Any]] = []
        self.use_faiss = use_faiss and HAS_FAISS
        self.index: Optional[Any] = None
        self.embedding_dim: Optional[int] = None

    def add(
        self,
        positions: np.ndarray,
        quaternions: np.ndarray,
        metadata: Optional[Dict[str, Any]] = None,
    ) -> None:
        out = encode_dhb_dr(
            positions, quaternions,
            method=EncodingMethod.POSITION, use_default_initial_frames=True, dhb_method=self.dhb_method,
        )
        U = np.concatenate([
            out["linear_motion_invariants"],
            out["angular_motion_invariants"],
        ], axis=1)
        self.invariants_list.append(U.astype(np.float32))
        self.metadata_list.append(metadata or {})
        if self.use_faiss and U.size > 0:
            flat = U.flatten()
            if self.embedding_dim is None:
                self.embedding_dim = flat.shape[0]
                self.index = faiss.IndexFlatL2(self.embedding_dim)
            emb = flat.reshape(1, -1).astype(np.float32)
            if emb.shape[1] == self.embedding_dim:
                self.index.add(emb)

    def retrieve(
        self,
        query_positions: np.ndarray,
        query_quaternions: np.ndarray,
        k: int = 10,
        use_dtw: bool = False,
    ) -> List[tuple]:
        out = encode_dhb_dr(
            query_positions, query_quaternions,
            method=EncodingMethod.POSITION, use_default_initial_frames=True, dhb_method=self.dhb_method,
        )
        U_q = np.concatenate([
            out["linear_motion_invariants"],
            out["angular_motion_invariants"],
        ], axis=1)
        if use_dtw:
            dists = [soft_dtw_distance(U_q, U) for U in self.invariants_list]
            idx = np.argsort(dists)[:k]
            return [(self.invariants_list[i], self.metadata_list[i], dists[i]) for i in idx]

        # Use FAISS index when available and embeddings match
        if self.use_faiss and self.index is not None and self.embedding_dim is not None:
            flat = U_q.flatten().astype(np.float32)
            if flat.shape[0] == self.embedding_dim:
                q_emb = flat.reshape(1, -1)
                k_actual = min(k, len(self.invariants_list))
                distances, indices = self.index.search(q_emb, k_actual)
                return [
                    (self.invariants_list[indices[0, j]], self.metadata_list[indices[0, j]], float(distances[0, j]))
                    for j in range(k_actual)
                    if indices[0, j] >= 0
                ]

        # Brute-force fallback
        dists = [invariant_distance(U_q, U) for U in self.invariants_list]
        idx = np.argsort(dists)[:k]
        return [(self.invariants_list[i], self.metadata_list[i], dists[i]) for i in idx]

    def save(self, path: str) -> None:
        """Persist the database to disk.  Writes ``<path>.npz`` (invariants) and ``<path>.json`` (metadata)."""
        arrs = {f"inv_{i}": inv for i, inv in enumerate(self.invariants_list)}
        np.savez(path + ".npz", **arrs)
        meta = {
            "dhb_method": "double_reflection" if self.dhb_method == DHBMethod.DOUBLE_REFLECTION else "original",
            "metadata_list": self.metadata_list,
        }
        with open(path + ".json", "w") as f:
            json.dump(meta, f)

    @classmethod
    def load(cls, path: str) -> "MotionDatabase":
        """Load a database from ``<path>.npz`` and ``<path>.json``."""
        with open(path + ".json", "r") as f:
            meta = json.load(f)
        db = cls(dhb_method=meta["dhb_method"], use_faiss=False)
        db.metadata_list = meta["metadata_list"]
        data = np.load(path + ".npz")
        n = len(db.metadata_list)
        db.invariants_list = [data[f"inv_{i}"].astype(np.float32) for i in range(n)]
        return db

Functions

load classmethod

load(path)

Load a database from <path>.npz and <path>.json.

Source code in src/dhb_xr/database/motion_db.py
@classmethod
def load(cls, path: str) -> "MotionDatabase":
    """Load a database from ``<path>.npz`` and ``<path>.json``."""
    with open(path + ".json", "r") as f:
        meta = json.load(f)
    db = cls(dhb_method=meta["dhb_method"], use_faiss=False)
    db.metadata_list = meta["metadata_list"]
    data = np.load(path + ".npz")
    n = len(db.metadata_list)
    db.invariants_list = [data[f"inv_{i}"].astype(np.float32) for i in range(n)]
    return db

save

save(path)

Persist the database to disk. Writes <path>.npz (invariants) and <path>.json (metadata).

Source code in src/dhb_xr/database/motion_db.py
def save(self, path: str) -> None:
    """Persist the database to disk.  Writes ``<path>.npz`` (invariants) and ``<path>.json`` (metadata)."""
    arrs = {f"inv_{i}": inv for i, inv in enumerate(self.invariants_list)}
    np.savez(path + ".npz", **arrs)
    meta = {
        "dhb_method": "double_reflection" if self.dhb_method == DHBMethod.DOUBLE_REFLECTION else "original",
        "metadata_list": self.metadata_list,
    }
    with open(path + ".json", "w") as f:
        json.dump(meta, f)

Usage Example

from dhb_xr.database.motion_db import MotionDatabase
from dhb_xr.encoder.dhb_dr import encode_dhb_dr

# Create database
db = MotionDatabase()

# Add trajectories
for trajectory in trajectories:
    invariants = encode_dhb_dr(trajectory['positions'], trajectory['quaternions'])
    db.add_trajectory(invariants, metadata=trajectory['metadata'])

# Search similar trajectories
query_invariants = encode_dhb_dr(query_positions, query_quaternions)
similar = db.find_similar(query_invariants, k=5)