Skip to content

DHB-XR

SE(3) invariant trajectory encoding for robotics and VLA applications

DHB-XR (DHB Extended Representations) provides SE(3) invariant trajectory encoding for robotics and VLA (Vision-Language-Action) applications. The library implements various DHB (Double-Helix Based) methods for encoding trajectories into invariant representations that are independent of global position and orientation changes.

Research Background

This library implements the double-reflection (DHB-DR) and quaternion-relative (DHB-QR) invariant representations for rigid-body motion trajectories on SE(3), as described in the manuscript "Double-Reflection DHB Invariant Representation on SE(3)".

Author: Andy Park (andypark.purdue@gmail.com)

Features

  • DHB-DR (Double-Reflection): Euler-based invariants (4 values per component)
  • DHB-QR (Quaternion-Relative): Quaternion-based invariants (5 values per component), no gimbal lock
  • DHB-TI (Time-Invariant): Speed-independent representations via arc-length reparameterization
  • Trajectory adaptation: Resample and decode with new initial pose; optional batched optimization
  • Tokenization: VQ-VAE / RVQ for DHB-Token (VLA action representation)
  • Motion database: Storage, similarity (L2, DTW), and retrieval
  • Imitation learning: Invariant matching, SE(3) geodesic, and hybrid losses
  • Robust encoding: Handles reversals, zero-motion segments, and frame alignment issues

Quick start

pip install dhb_xr
# or with extras: pip install dhb_xr[gpu,database,optimization]
from dhb_xr import encode_dhb_dr, decode_dhb_dr
from dhb_xr.core.types import DHBMethod, EncodingMethod
import numpy as np

# Create sample trajectory data
n = 50
positions = np.cumsum(np.random.randn(n, 3) * 0.01, axis=0)
quaternions = np.tile(np.array([1.0, 0.0, 0.0, 0.0]), (n, 1))  # identity orientation

# Encode trajectory to invariants
result = encode_dhb_dr(
    positions, quaternions,
    method=EncodingMethod.POSITION,
    use_default_initial_frames=True,
    dhb_method=DHBMethod.DOUBLE_REFLECTION
)

# Decode back to poses
decoded = decode_dhb_dr(
    result["linear_motion_invariants"],
    result["angular_motion_invariants"],
    result["initial_pose"],
    method=EncodingMethod.POSITION,
    dhb_method=DHBMethod.DOUBLE_REFLECTION,
    drop_padded=True
)

print(f"Original shape: {positions.shape}")
print(f"Decoded shape: {decoded['positions'].shape}")

Examples

See Examples for a walkthrough of the example scripts and how to run them.

VLA Integration

DHB-XR integrates with Vision-Language-Action (VLA) benchmarks:

  • LIBERO: Load trajectories, encode to DHB invariants, run in simulation
  • RoboCASA: HDF5 dataset support with motion retrieval

See VLA Integration Guide for setup and usage.

API Reference

dhb_xr

dhb_xr: DHB Extended Representations for SE(3) invariant trajectory encoding.

Classes

DHBMethod

Bases: Enum

DHB encoding method.

Source code in src/dhb_xr/core/types.py
class DHBMethod(Enum):
    """DHB encoding method."""

    ORIGINAL = "original"  # 3 invariants per component (magnitude + 2 angles)
    DOUBLE_REFLECTION = "double_reflection"  # 4 invariants (magnitude + Euler XYZ)
    QUATERNION = "quaternion"  # 5 invariants (magnitude + unit quaternion)

EncodingMethod

Bases: Enum

Encoding method for initial frame computation.

Source code in src/dhb_xr/core/types.py
class EncodingMethod(Enum):
    """Encoding method for initial frame computation."""

    POSITION = "pos"  # Position-based encoding: use initial position for frame origin
    VELOCITY = "vel"  # Velocity-based encoding: use first position difference for frame origin

ProcessedTrajectory dataclass

Result of trajectory preprocessing.

Source code in src/dhb_xr/preprocessing/trajectory_cleaner.py
@dataclass
class ProcessedTrajectory:
    """Result of trajectory preprocessing."""

    positions: np.ndarray
    quaternions: np.ndarray

    # Original data
    original_positions: np.ndarray = None
    original_quaternions: np.ndarray = None

    # Processing metadata
    was_aligned: bool = False
    alignment_rotation: np.ndarray = None  # 3x3 rotation matrix applied

    was_smoothed: bool = False
    smoothing_sigma: float = 0.0

    # Detected and handled issues
    removed_indices: List[int] = field(default_factory=list)
    interpolated_segments: List[Tuple[int, int]] = field(default_factory=list)

    # Diagnostics before/after
    diagnostics_before: TrajectoryDiagnostics = None
    diagnostics_after: TrajectoryDiagnostics = None

    def get_inverse_alignment(self) -> np.ndarray:
        """Get rotation to undo alignment (for decoding)."""
        if self.alignment_rotation is not None:
            return self.alignment_rotation.T
        return np.eye(3)
Functions
get_inverse_alignment
get_inverse_alignment()

Get rotation to undo alignment (for decoding).

Source code in src/dhb_xr/preprocessing/trajectory_cleaner.py
def get_inverse_alignment(self) -> np.ndarray:
    """Get rotation to undo alignment (for decoding)."""
    if self.alignment_rotation is not None:
        return self.alignment_rotation.T
    return np.eye(3)

TrajectoryDiagnostics dataclass

Comprehensive trajectory quality report.

Source code in src/dhb_xr/preprocessing/diagnostics.py
@dataclass
class TrajectoryDiagnostics:
    """Comprehensive trajectory quality report."""

    # Basic info
    num_samples: int = 0
    trajectory_length: float = 0.0

    # Reversal detection
    num_reversals: int = 0
    reversal_indices: List[int] = field(default_factory=list)
    min_tangent_dot_product: float = 1.0  # -1 indicates 180° reversal

    # Zero-motion detection
    num_zero_motion_segments: int = 0
    zero_motion_segments: List[Tuple[int, int]] = field(default_factory=list)
    total_zero_motion_frames: int = 0

    # Initial direction analysis
    initial_direction: np.ndarray = field(default_factory=lambda: np.array([1.0, 0.0, 0.0]))
    initial_direction_magnitude: float = 0.0
    alignment_angle_to_x: float = 0.0  # Angle between initial direction and +x

    # Rotation analysis
    max_angular_velocity: float = 0.0
    total_rotation_angle: float = 0.0

    # Quality metrics
    is_well_conditioned: bool = True
    recommended_preprocessing: List[PreprocessingRecommendation] = field(default_factory=list)

    def summary(self) -> str:
        """Return human-readable summary."""
        lines = [
            f"Trajectory Diagnostics ({self.num_samples} samples, {self.trajectory_length:.3f}m total)",
            f"  Reversals: {self.num_reversals} (min dot: {self.min_tangent_dot_product:.3f})",
            f"  Zero-motion segments: {self.num_zero_motion_segments} ({self.total_zero_motion_frames} frames)",
            f"  Initial direction: {self.initial_direction} (angle to +x: {np.degrees(self.alignment_angle_to_x):.1f}°)",
            f"  Well-conditioned: {self.is_well_conditioned}",
        ]
        if self.recommended_preprocessing:
            lines.append(f"  Recommended: {[r.value for r in self.recommended_preprocessing]}")
        return "\n".join(lines)
Functions
summary
summary()

Return human-readable summary.

Source code in src/dhb_xr/preprocessing/diagnostics.py
def summary(self) -> str:
    """Return human-readable summary."""
    lines = [
        f"Trajectory Diagnostics ({self.num_samples} samples, {self.trajectory_length:.3f}m total)",
        f"  Reversals: {self.num_reversals} (min dot: {self.min_tangent_dot_product:.3f})",
        f"  Zero-motion segments: {self.num_zero_motion_segments} ({self.total_zero_motion_frames} frames)",
        f"  Initial direction: {self.initial_direction} (angle to +x: {np.degrees(self.alignment_angle_to_x):.1f}°)",
        f"  Well-conditioned: {self.is_well_conditioned}",
    ]
    if self.recommended_preprocessing:
        lines.append(f"  Recommended: {[r.value for r in self.recommended_preprocessing]}")
    return "\n".join(lines)

TrajectoryPreprocessor

Prepare trajectories for DHB encoding.

Provides a configurable preprocessing pipeline to handle common issues in real-world trajectory data:

  • align_to_x: Rotate trajectory so initial motion aligns with +x. This ensures use_default_initial_frames=True works correctly.

  • smooth_reversals: Apply smoothing near detected 180° reversals. Prevents Euler singularities during encoding.

  • remove_zero_motion: Remove or interpolate zero-motion segments. Prevents undefined tangent frames.

  • gaussian_sigma: Overall Gaussian smoothing to reduce sensor noise.

Example

preprocessor = TrajectoryPreprocessor( ... align_to_x=True, ... smooth_reversals=True, ... gaussian_sigma=1.0, ... ) result = preprocessor.process(positions, quaternions)

Use result.positions, result.quaternions for encoding
After decoding, apply result.get_inverse_alignment() to restore original frame
Source code in src/dhb_xr/preprocessing/trajectory_cleaner.py
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
class TrajectoryPreprocessor:
    """
    Prepare trajectories for DHB encoding.

    Provides a configurable preprocessing pipeline to handle common
    issues in real-world trajectory data:

    - **align_to_x**: Rotate trajectory so initial motion aligns with +x.
      This ensures use_default_initial_frames=True works correctly.

    - **smooth_reversals**: Apply smoothing near detected 180° reversals.
      Prevents Euler singularities during encoding.

    - **remove_zero_motion**: Remove or interpolate zero-motion segments.
      Prevents undefined tangent frames.

    - **gaussian_sigma**: Overall Gaussian smoothing to reduce sensor noise.

    Example:
        >>> preprocessor = TrajectoryPreprocessor(
        ...     align_to_x=True,
        ...     smooth_reversals=True,
        ...     gaussian_sigma=1.0,
        ... )
        >>> result = preprocessor.process(positions, quaternions)
        >>> # Use result.positions, result.quaternions for encoding
        >>> # After decoding, apply result.get_inverse_alignment() to restore original frame
    """

    def __init__(
        self,
        align_to_x: bool = True,
        smooth_reversals: bool = True,
        remove_zero_motion: bool = True,
        interpolate_zero_motion: bool = True,
        gaussian_sigma: float = 0.0,
        reversal_threshold: float = -0.5,
        zero_motion_threshold: float = 1e-6,
        reversal_smoothing_radius: int = 3,
    ):
        """
        Initialize preprocessor.

        Args:
            align_to_x: Rotate trajectory to align initial direction with +x.
            smooth_reversals: Apply local smoothing near 180° reversals.
            remove_zero_motion: Handle zero-motion segments.
            interpolate_zero_motion: If True, interpolate zero-motion segments.
                If False, remove them entirely.
            gaussian_sigma: Sigma for Gaussian smoothing (0 = no smoothing).
            reversal_threshold: Dot product threshold for reversal detection.
            zero_motion_threshold: Distance threshold for zero motion.
            reversal_smoothing_radius: Number of samples to smooth around reversals.
        """
        self.align_to_x = align_to_x
        self.smooth_reversals = smooth_reversals
        self.remove_zero_motion = remove_zero_motion
        self.interpolate_zero_motion = interpolate_zero_motion
        self.gaussian_sigma = gaussian_sigma
        self.reversal_threshold = reversal_threshold
        self.zero_motion_threshold = zero_motion_threshold
        self.reversal_smoothing_radius = reversal_smoothing_radius

    def process(
        self,
        positions: np.ndarray,
        quaternions: np.ndarray,
        return_diagnostics: bool = True,
    ) -> ProcessedTrajectory:
        """
        Run full preprocessing pipeline.

        Order of operations:
        1. Analyze original trajectory
        2. Handle zero-motion segments (remove or interpolate)
        3. Smooth reversals (if detected)
        4. Apply Gaussian smoothing (if sigma > 0)
        5. Align to +x direction
        6. Analyze final trajectory

        Args:
            positions: (N, 3) position trajectory.
            quaternions: (N, 4) wxyz quaternion trajectory.
            return_diagnostics: Include before/after diagnostics.

        Returns:
            ProcessedTrajectory with cleaned data and metadata.
        """
        positions = np.asarray(positions, dtype=np.float64).copy()
        quaternions = np.asarray(quaternions, dtype=np.float64).copy()

        # Store originals
        original_positions = positions.copy()
        original_quaternions = quaternions.copy()

        # Initialize result
        result = ProcessedTrajectory(
            positions=positions,
            quaternions=quaternions,
            original_positions=original_positions,
            original_quaternions=original_quaternions,
        )

        # Analyze before
        if return_diagnostics:
            result.diagnostics_before = analyze_trajectory(
                positions, quaternions,
                reversal_threshold=self.reversal_threshold,
                zero_motion_threshold=self.zero_motion_threshold,
            )

        # Step 1: Handle zero-motion segments
        if self.remove_zero_motion:
            positions, quaternions, removed, interpolated = self._handle_zero_motion(
                positions, quaternions
            )
            result.removed_indices = removed
            result.interpolated_segments = interpolated

        # Step 2: Smooth reversals
        if self.smooth_reversals:
            reversal_indices, _ = detect_reversals(positions, self.reversal_threshold)
            if reversal_indices:
                positions, quaternions = self._smooth_around_reversals(
                    positions, quaternions, reversal_indices
                )

        # Step 3: Gaussian smoothing
        if self.gaussian_sigma > 0:
            positions, quaternions = self.smooth_trajectory(
                positions, quaternions, self.gaussian_sigma
            )
            result.was_smoothed = True
            result.smoothing_sigma = self.gaussian_sigma

        # Step 4: Align to +x
        if self.align_to_x:
            positions, quaternions, R = self.align_to_initial_direction(
                positions, quaternions
            )
            result.was_aligned = True
            result.alignment_rotation = R

        # Store final
        result.positions = positions
        result.quaternions = quaternions

        # Analyze after
        if return_diagnostics:
            result.diagnostics_after = analyze_trajectory(
                positions, quaternions,
                reversal_threshold=self.reversal_threshold,
                zero_motion_threshold=self.zero_motion_threshold,
            )

        return result

    def _handle_zero_motion(
        self,
        positions: np.ndarray,
        quaternions: np.ndarray,
    ) -> Tuple[np.ndarray, np.ndarray, List[int], List[Tuple[int, int]]]:
        """Handle zero-motion segments by removing or interpolating."""
        segments = detect_zero_motion(positions, self.zero_motion_threshold)

        if not segments:
            return positions, quaternions, [], []

        removed_indices = []
        interpolated_segments = []

        if self.interpolate_zero_motion:
            # Interpolate zero-motion segments
            for start, end in segments:
                positions, quaternions = self._interpolate_segment(
                    positions, quaternions, start, end
                )
                interpolated_segments.append((start, end))
            return positions, quaternions, [], interpolated_segments
        else:
            # Remove zero-motion frames (keep first frame of each segment)
            keep_mask = np.ones(len(positions), dtype=bool)
            for start, end in segments:
                keep_mask[start + 1:end + 1] = False
                removed_indices.extend(range(start + 1, end + 1))

            positions = positions[keep_mask]
            quaternions = quaternions[keep_mask]
            return positions, quaternions, removed_indices, []

    def _interpolate_segment(
        self,
        positions: np.ndarray,
        quaternions: np.ndarray,
        start: int,
        end: int,
    ) -> Tuple[np.ndarray, np.ndarray]:
        """Interpolate a zero-motion segment."""
        # Get boundary positions
        p_start = positions[start]
        p_end = positions[min(end + 1, len(positions) - 1)]

        # Linear interpolation for positions
        segment_length = end - start + 1
        if segment_length > 0 and not np.allclose(p_start, p_end):
            for i, idx in enumerate(range(start, end + 1)):
                if idx < len(positions):
                    t = (i + 1) / (segment_length + 1)
                    positions[idx] = (1 - t) * p_start + t * p_end

        # SLERP for quaternions
        q_start = quaternions[start]
        q_end = quaternions[min(end + 1, len(quaternions) - 1)]

        if not np.allclose(q_start, q_end):
            try:
                # Convert to scipy Rotation (expects xyzw)
                r_start = Rotation.from_quat(np.roll(q_start, -1))  # wxyz to xyzw
                r_end = Rotation.from_quat(np.roll(q_end, -1))

                slerp = Slerp([0, 1], Rotation.concatenate([r_start, r_end]))

                for i, idx in enumerate(range(start, end + 1)):
                    if idx < len(quaternions):
                        t = (i + 1) / (segment_length + 1)
                        r_interp = slerp(t)
                        q_xyzw = r_interp.as_quat()
                        quaternions[idx] = np.roll(q_xyzw, 1)  # xyzw to wxyz
            except ValueError:
                pass  # Keep original if SLERP fails

        return positions, quaternions

    def _smooth_around_reversals(
        self,
        positions: np.ndarray,
        quaternions: np.ndarray,
        reversal_indices: List[int],
    ) -> Tuple[np.ndarray, np.ndarray]:
        """Apply local smoothing around reversal points."""
        radius = self.reversal_smoothing_radius

        for idx in reversal_indices:
            # Get window around reversal
            start = max(0, idx - radius)
            end = min(len(positions), idx + radius + 1)

            if end - start < 3:
                continue

            # Apply local Gaussian smoothing
            window_positions = positions[start:end].copy()
            for dim in range(3):
                window_positions[:, dim] = gaussian_filter1d(
                    window_positions[:, dim], sigma=1.0, mode='nearest'
                )
            positions[start:end] = window_positions

        return positions, quaternions

    def smooth_trajectory(
        self,
        positions: np.ndarray,
        quaternions: np.ndarray,
        sigma: float = 1.0,
    ) -> Tuple[np.ndarray, np.ndarray]:
        """
        Apply Gaussian smoothing to trajectory.

        Args:
            positions: (N, 3) position trajectory.
            quaternions: (N, 4) quaternion trajectory.
            sigma: Gaussian sigma parameter.

        Returns:
            Smoothed (positions, quaternions).
        """
        positions = positions.copy()

        # Smooth positions
        for dim in range(3):
            positions[:, dim] = gaussian_filter1d(
                positions[:, dim], sigma=sigma, mode='nearest'
            )

        # For quaternions, we use a simple moving average approach
        # (proper SLERP-based smoothing is more complex)
        # This preserves the general orientation but reduces jitter
        quaternions = self._smooth_quaternions(quaternions, sigma)

        return positions, quaternions

    def _smooth_quaternions(
        self,
        quaternions: np.ndarray,
        sigma: float,
    ) -> np.ndarray:
        """Smooth quaternion trajectory (simple approach)."""
        quaternions = quaternions.copy()
        n = len(quaternions)

        if n < 3:
            return quaternions

        # Ensure quaternion continuity (flip sign if needed)
        for i in range(1, n):
            if np.dot(quaternions[i], quaternions[i - 1]) < 0:
                quaternions[i] = -quaternions[i]

        # Apply Gaussian filter to each component
        for dim in range(4):
            quaternions[:, dim] = gaussian_filter1d(
                quaternions[:, dim], sigma=sigma, mode='nearest'
            )

        # Renormalize
        norms = np.linalg.norm(quaternions, axis=1, keepdims=True)
        quaternions = quaternions / (norms + 1e-10)

        return quaternions

    def align_to_initial_direction(
        self,
        positions: np.ndarray,
        quaternions: np.ndarray,
        target_dir: np.ndarray = None,
    ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
        """
        Rotate trajectory so initial motion aligns with target direction.

        Args:
            positions: (N, 3) position trajectory.
            quaternions: (N, 4) wxyz quaternion trajectory.
            target_dir: Target direction (default: +x = [1,0,0]).

        Returns:
            Tuple of (aligned_positions, aligned_quaternions, rotation_matrix).
        """
        if target_dir is None:
            target_dir = np.array([1.0, 0.0, 0.0])

        target_dir = np.asarray(target_dir)
        target_dir = target_dir / (np.linalg.norm(target_dir) + 1e-10)

        # Compute initial direction
        from .diagnostics import compute_initial_direction
        init_dir, _ = compute_initial_direction(positions)

        # Check if already aligned
        if np.allclose(init_dir, target_dir, atol=1e-6):
            return positions, quaternions, np.eye(3)

        # Compute rotation from init_dir to target_dir
        R = self._rotation_between_vectors(init_dir, target_dir)

        # Apply rotation to positions (relative to first position)
        positions = positions.copy()
        origin = positions[0].copy()
        positions = (R @ (positions - origin).T).T + origin

        # Apply rotation to quaternions
        quaternions = quaternions.copy()
        R_quat = Rotation.from_matrix(R).as_quat()  # xyzw
        R_quat_wxyz = np.roll(R_quat, 1)  # wxyz

        for i in range(len(quaternions)):
            # Quaternion multiplication: R * q
            q = quaternions[i]
            q_rot = self._quat_multiply(R_quat_wxyz, q)
            quaternions[i] = q_rot

        return positions, quaternions, R

    def _rotation_between_vectors(
        self,
        v1: np.ndarray,
        v2: np.ndarray,
    ) -> np.ndarray:
        """Compute rotation matrix that rotates v1 to v2."""
        v1 = v1 / (np.linalg.norm(v1) + 1e-10)
        v2 = v2 / (np.linalg.norm(v2) + 1e-10)

        # Handle parallel/anti-parallel cases
        dot = np.dot(v1, v2)
        if dot > 0.9999:
            return np.eye(3)
        if dot < -0.9999:
            # Find orthogonal axis
            if abs(v1[0]) < 0.9:
                axis = np.cross(v1, np.array([1, 0, 0]))
            else:
                axis = np.cross(v1, np.array([0, 1, 0]))
            axis = axis / np.linalg.norm(axis)
            return Rotation.from_rotvec(np.pi * axis).as_matrix()

        # Rodrigues' rotation formula
        axis = np.cross(v1, v2)
        axis = axis / np.linalg.norm(axis)
        angle = np.arccos(np.clip(dot, -1, 1))

        return Rotation.from_rotvec(angle * axis).as_matrix()

    def _quat_multiply(self, q1: np.ndarray, q2: np.ndarray) -> np.ndarray:
        """Multiply quaternions (wxyz convention)."""
        w1, x1, y1, z1 = q1
        w2, x2, y2, z2 = q2
        return np.array([
            w1*w2 - x1*x2 - y1*y2 - z1*z2,
            w1*x2 + x1*w2 + y1*z2 - z1*y2,
            w1*y2 - x1*z2 + y1*w2 + z1*x2,
            w1*z2 + x1*y2 - y1*x2 + z1*w2,
        ])

    # Convenience methods for standalone use

    def detect_reversals(self, positions: np.ndarray) -> List[int]:
        """Find indices where 180° reversals occur."""
        indices, _ = detect_reversals(positions, self.reversal_threshold)
        return indices

    def detect_zero_motion(
        self,
        positions: np.ndarray,
    ) -> List[Tuple[int, int]]:
        """Find zero-motion segments as (start, end) tuples."""
        return detect_zero_motion(positions, self.zero_motion_threshold)
Functions
__init__
__init__(
    align_to_x=True,
    smooth_reversals=True,
    remove_zero_motion=True,
    interpolate_zero_motion=True,
    gaussian_sigma=0.0,
    reversal_threshold=-0.5,
    zero_motion_threshold=1e-06,
    reversal_smoothing_radius=3,
)

Initialize preprocessor.

Parameters:

Name Type Description Default
align_to_x bool

Rotate trajectory to align initial direction with +x.

True
smooth_reversals bool

Apply local smoothing near 180° reversals.

True
remove_zero_motion bool

Handle zero-motion segments.

True
interpolate_zero_motion bool

If True, interpolate zero-motion segments. If False, remove them entirely.

True
gaussian_sigma float

Sigma for Gaussian smoothing (0 = no smoothing).

0.0
reversal_threshold float

Dot product threshold for reversal detection.

-0.5
zero_motion_threshold float

Distance threshold for zero motion.

1e-06
reversal_smoothing_radius int

Number of samples to smooth around reversals.

3
Source code in src/dhb_xr/preprocessing/trajectory_cleaner.py
def __init__(
    self,
    align_to_x: bool = True,
    smooth_reversals: bool = True,
    remove_zero_motion: bool = True,
    interpolate_zero_motion: bool = True,
    gaussian_sigma: float = 0.0,
    reversal_threshold: float = -0.5,
    zero_motion_threshold: float = 1e-6,
    reversal_smoothing_radius: int = 3,
):
    """
    Initialize preprocessor.

    Args:
        align_to_x: Rotate trajectory to align initial direction with +x.
        smooth_reversals: Apply local smoothing near 180° reversals.
        remove_zero_motion: Handle zero-motion segments.
        interpolate_zero_motion: If True, interpolate zero-motion segments.
            If False, remove them entirely.
        gaussian_sigma: Sigma for Gaussian smoothing (0 = no smoothing).
        reversal_threshold: Dot product threshold for reversal detection.
        zero_motion_threshold: Distance threshold for zero motion.
        reversal_smoothing_radius: Number of samples to smooth around reversals.
    """
    self.align_to_x = align_to_x
    self.smooth_reversals = smooth_reversals
    self.remove_zero_motion = remove_zero_motion
    self.interpolate_zero_motion = interpolate_zero_motion
    self.gaussian_sigma = gaussian_sigma
    self.reversal_threshold = reversal_threshold
    self.zero_motion_threshold = zero_motion_threshold
    self.reversal_smoothing_radius = reversal_smoothing_radius
align_to_initial_direction
align_to_initial_direction(positions, quaternions, target_dir=None)

Rotate trajectory so initial motion aligns with target direction.

Parameters:

Name Type Description Default
positions ndarray

(N, 3) position trajectory.

required
quaternions ndarray

(N, 4) wxyz quaternion trajectory.

required
target_dir ndarray

Target direction (default: +x = [1,0,0]).

None

Returns:

Type Description
Tuple[ndarray, ndarray, ndarray]

Tuple of (aligned_positions, aligned_quaternions, rotation_matrix).

Source code in src/dhb_xr/preprocessing/trajectory_cleaner.py
def align_to_initial_direction(
    self,
    positions: np.ndarray,
    quaternions: np.ndarray,
    target_dir: np.ndarray = None,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """
    Rotate trajectory so initial motion aligns with target direction.

    Args:
        positions: (N, 3) position trajectory.
        quaternions: (N, 4) wxyz quaternion trajectory.
        target_dir: Target direction (default: +x = [1,0,0]).

    Returns:
        Tuple of (aligned_positions, aligned_quaternions, rotation_matrix).
    """
    if target_dir is None:
        target_dir = np.array([1.0, 0.0, 0.0])

    target_dir = np.asarray(target_dir)
    target_dir = target_dir / (np.linalg.norm(target_dir) + 1e-10)

    # Compute initial direction
    from .diagnostics import compute_initial_direction
    init_dir, _ = compute_initial_direction(positions)

    # Check if already aligned
    if np.allclose(init_dir, target_dir, atol=1e-6):
        return positions, quaternions, np.eye(3)

    # Compute rotation from init_dir to target_dir
    R = self._rotation_between_vectors(init_dir, target_dir)

    # Apply rotation to positions (relative to first position)
    positions = positions.copy()
    origin = positions[0].copy()
    positions = (R @ (positions - origin).T).T + origin

    # Apply rotation to quaternions
    quaternions = quaternions.copy()
    R_quat = Rotation.from_matrix(R).as_quat()  # xyzw
    R_quat_wxyz = np.roll(R_quat, 1)  # wxyz

    for i in range(len(quaternions)):
        # Quaternion multiplication: R * q
        q = quaternions[i]
        q_rot = self._quat_multiply(R_quat_wxyz, q)
        quaternions[i] = q_rot

    return positions, quaternions, R
detect_reversals
detect_reversals(positions)

Find indices where 180° reversals occur.

Source code in src/dhb_xr/preprocessing/trajectory_cleaner.py
def detect_reversals(self, positions: np.ndarray) -> List[int]:
    """Find indices where 180° reversals occur."""
    indices, _ = detect_reversals(positions, self.reversal_threshold)
    return indices
detect_zero_motion
detect_zero_motion(positions)

Find zero-motion segments as (start, end) tuples.

Source code in src/dhb_xr/preprocessing/trajectory_cleaner.py
def detect_zero_motion(
    self,
    positions: np.ndarray,
) -> List[Tuple[int, int]]:
    """Find zero-motion segments as (start, end) tuples."""
    return detect_zero_motion(positions, self.zero_motion_threshold)
process
process(positions, quaternions, return_diagnostics=True)

Run full preprocessing pipeline.

Order of operations: 1. Analyze original trajectory 2. Handle zero-motion segments (remove or interpolate) 3. Smooth reversals (if detected) 4. Apply Gaussian smoothing (if sigma > 0) 5. Align to +x direction 6. Analyze final trajectory

Parameters:

Name Type Description Default
positions ndarray

(N, 3) position trajectory.

required
quaternions ndarray

(N, 4) wxyz quaternion trajectory.

required
return_diagnostics bool

Include before/after diagnostics.

True

Returns:

Type Description
ProcessedTrajectory

ProcessedTrajectory with cleaned data and metadata.

Source code in src/dhb_xr/preprocessing/trajectory_cleaner.py
def process(
    self,
    positions: np.ndarray,
    quaternions: np.ndarray,
    return_diagnostics: bool = True,
) -> ProcessedTrajectory:
    """
    Run full preprocessing pipeline.

    Order of operations:
    1. Analyze original trajectory
    2. Handle zero-motion segments (remove or interpolate)
    3. Smooth reversals (if detected)
    4. Apply Gaussian smoothing (if sigma > 0)
    5. Align to +x direction
    6. Analyze final trajectory

    Args:
        positions: (N, 3) position trajectory.
        quaternions: (N, 4) wxyz quaternion trajectory.
        return_diagnostics: Include before/after diagnostics.

    Returns:
        ProcessedTrajectory with cleaned data and metadata.
    """
    positions = np.asarray(positions, dtype=np.float64).copy()
    quaternions = np.asarray(quaternions, dtype=np.float64).copy()

    # Store originals
    original_positions = positions.copy()
    original_quaternions = quaternions.copy()

    # Initialize result
    result = ProcessedTrajectory(
        positions=positions,
        quaternions=quaternions,
        original_positions=original_positions,
        original_quaternions=original_quaternions,
    )

    # Analyze before
    if return_diagnostics:
        result.diagnostics_before = analyze_trajectory(
            positions, quaternions,
            reversal_threshold=self.reversal_threshold,
            zero_motion_threshold=self.zero_motion_threshold,
        )

    # Step 1: Handle zero-motion segments
    if self.remove_zero_motion:
        positions, quaternions, removed, interpolated = self._handle_zero_motion(
            positions, quaternions
        )
        result.removed_indices = removed
        result.interpolated_segments = interpolated

    # Step 2: Smooth reversals
    if self.smooth_reversals:
        reversal_indices, _ = detect_reversals(positions, self.reversal_threshold)
        if reversal_indices:
            positions, quaternions = self._smooth_around_reversals(
                positions, quaternions, reversal_indices
            )

    # Step 3: Gaussian smoothing
    if self.gaussian_sigma > 0:
        positions, quaternions = self.smooth_trajectory(
            positions, quaternions, self.gaussian_sigma
        )
        result.was_smoothed = True
        result.smoothing_sigma = self.gaussian_sigma

    # Step 4: Align to +x
    if self.align_to_x:
        positions, quaternions, R = self.align_to_initial_direction(
            positions, quaternions
        )
        result.was_aligned = True
        result.alignment_rotation = R

    # Store final
    result.positions = positions
    result.quaternions = quaternions

    # Analyze after
    if return_diagnostics:
        result.diagnostics_after = analyze_trajectory(
            positions, quaternions,
            reversal_threshold=self.reversal_threshold,
            zero_motion_threshold=self.zero_motion_threshold,
        )

    return result
smooth_trajectory
smooth_trajectory(positions, quaternions, sigma=1.0)

Apply Gaussian smoothing to trajectory.

Parameters:

Name Type Description Default
positions ndarray

(N, 3) position trajectory.

required
quaternions ndarray

(N, 4) quaternion trajectory.

required
sigma float

Gaussian sigma parameter.

1.0

Returns:

Type Description
Tuple[ndarray, ndarray]

Smoothed (positions, quaternions).

Source code in src/dhb_xr/preprocessing/trajectory_cleaner.py
def smooth_trajectory(
    self,
    positions: np.ndarray,
    quaternions: np.ndarray,
    sigma: float = 1.0,
) -> Tuple[np.ndarray, np.ndarray]:
    """
    Apply Gaussian smoothing to trajectory.

    Args:
        positions: (N, 3) position trajectory.
        quaternions: (N, 4) quaternion trajectory.
        sigma: Gaussian sigma parameter.

    Returns:
        Smoothed (positions, quaternions).
    """
    positions = positions.copy()

    # Smooth positions
    for dim in range(3):
        positions[:, dim] = gaussian_filter1d(
            positions[:, dim], sigma=sigma, mode='nearest'
        )

    # For quaternions, we use a simple moving average approach
    # (proper SLERP-based smoothing is more complex)
    # This preserves the general orientation but reduces jitter
    quaternions = self._smooth_quaternions(quaternions, sigma)

    return positions, quaternions

Functions

analyze_trajectory

analyze_trajectory(
    positions, quaternions=None, reversal_threshold=-0.5, zero_motion_threshold=1e-06
)

Comprehensive trajectory quality analysis.

Detects potential issues that could cause DHB encoding problems: - 180° direction reversals (cause Euler singularities) - Zero-motion segments (cause undefined tangent frames) - Misaligned initial direction (reconstruction error with default frames)

Parameters:

Name Type Description Default
positions ndarray

(N, 3) position trajectory.

required
quaternions Optional[ndarray]

Optional (N, 4) wxyz quaternion trajectory.

None
reversal_threshold float

Dot product threshold for reversal detection.

-0.5
zero_motion_threshold float

Distance threshold for zero motion.

1e-06

Returns:

Type Description
TrajectoryDiagnostics

TrajectoryDiagnostics with analysis results and recommendations.

Source code in src/dhb_xr/preprocessing/diagnostics.py
def analyze_trajectory(
    positions: np.ndarray,
    quaternions: Optional[np.ndarray] = None,
    reversal_threshold: float = -0.5,
    zero_motion_threshold: float = 1e-6,
) -> TrajectoryDiagnostics:
    """
    Comprehensive trajectory quality analysis.

    Detects potential issues that could cause DHB encoding problems:
    - 180° direction reversals (cause Euler singularities)
    - Zero-motion segments (cause undefined tangent frames)
    - Misaligned initial direction (reconstruction error with default frames)

    Args:
        positions: (N, 3) position trajectory.
        quaternions: Optional (N, 4) wxyz quaternion trajectory.
        reversal_threshold: Dot product threshold for reversal detection.
        zero_motion_threshold: Distance threshold for zero motion.

    Returns:
        TrajectoryDiagnostics with analysis results and recommendations.
    """
    positions = np.asarray(positions)
    diagnostics = TrajectoryDiagnostics()

    # Basic info
    diagnostics.num_samples = len(positions)
    if len(positions) > 1:
        steps = np.diff(positions, axis=0)
        diagnostics.trajectory_length = np.sum(np.linalg.norm(steps, axis=1))

    # Reversal detection
    reversal_indices, min_dot = detect_reversals(positions, reversal_threshold)
    diagnostics.num_reversals = len(reversal_indices)
    diagnostics.reversal_indices = reversal_indices
    diagnostics.min_tangent_dot_product = min_dot

    # Zero-motion detection
    zero_segments = detect_zero_motion(positions, zero_motion_threshold)
    diagnostics.num_zero_motion_segments = len(zero_segments)
    diagnostics.zero_motion_segments = zero_segments
    diagnostics.total_zero_motion_frames = sum(end - start for start, end in zero_segments)

    # Initial direction
    init_dir, init_mag = compute_initial_direction(positions)
    diagnostics.initial_direction = init_dir
    diagnostics.initial_direction_magnitude = init_mag
    diagnostics.alignment_angle_to_x = compute_alignment_angle(init_dir)

    # Rotation analysis (if quaternions provided)
    if quaternions is not None:
        quaternions = np.asarray(quaternions)
        if len(quaternions) > 1:
            # Compute angular velocities
            from dhb_xr.core import geometry as geom
            angular_velocities = []
            for i in range(len(quaternions) - 1):
                R_prev = geom.quat_to_rot(quaternions[i])
                R_curr = geom.quat_to_rot(quaternions[i + 1])
                R_rel = R_curr @ R_prev.T
                axis_angle = geom.rot_to_axis_angle(R_rel)
                angular_velocities.append(np.linalg.norm(axis_angle))

            if angular_velocities:
                diagnostics.max_angular_velocity = max(angular_velocities)
                diagnostics.total_rotation_angle = sum(angular_velocities)

    # Determine if well-conditioned
    diagnostics.is_well_conditioned = (
        diagnostics.num_reversals == 0 and
        diagnostics.num_zero_motion_segments == 0 and
        diagnostics.alignment_angle_to_x < np.radians(30)  # Within 30° of +x
    )

    # Generate recommendations
    recommendations = []

    if diagnostics.alignment_angle_to_x > np.radians(10):
        recommendations.append(PreprocessingRecommendation.ALIGN_TO_X)

    if diagnostics.num_reversals > 0:
        recommendations.append(PreprocessingRecommendation.SMOOTH_TRAJECTORY)
        if diagnostics.min_tangent_dot_product < -0.8:
            recommendations.append(PreprocessingRecommendation.INTERPOLATE_REVERSALS)

    if diagnostics.num_zero_motion_segments > 0:
        recommendations.append(PreprocessingRecommendation.REMOVE_ZERO_MOTION)

    diagnostics.recommended_preprocessing = recommendations

    return diagnostics

decode_dhb_dr

decode_dhb_dr(
    linear_motion_invariants,
    angular_motion_invariants,
    initial_pose,
    method=EncodingMethod.POSITION,
    dhb_method=DHBMethod.DOUBLE_REFLECTION,
    drop_padded=True,
    robust_mode=False,
    validate_initial_pose=False,
    return_diagnostics=False,
)

Reconstruct poses from DHB invariants.

Parameters:

Name Type Description Default
linear_motion_invariants ndarray

(N, k) linear invariants

required
angular_motion_invariants ndarray

(N, k) angular invariants

required
initial_pose Dict[str, ndarray]

{'position': (3,), 'quaternion': (4,) wxyz}

required
method EncodingMethod

'pos' or 'vel'

POSITION
dhb_method DHBMethod

DHBMethod.ORIGINAL (3 inv) or DHBMethod.DOUBLE_REFLECTION (4 inv)

DOUBLE_REFLECTION
drop_padded bool

if True, return positions/quaternions[2:] to match encoder padding

True
robust_mode bool

Enable robustness features (NaN handling, normalization)

False
validate_initial_pose bool

Check initial pose validity before decoding

False
return_diagnostics bool

Include decoding diagnostics in output

False

Returns:

Type Description
Dict[str, Any]

dict with 'positions' (N,3), 'quaternions' (N,4) wxyz,

Dict[str, Any]

and optionally 'diagnostics' if return_diagnostics=True.

Source code in src/dhb_xr/decoder/dhb_dr.py
def decode_dhb_dr(
    linear_motion_invariants: np.ndarray,
    angular_motion_invariants: np.ndarray,
    initial_pose: Dict[str, np.ndarray],
    method: EncodingMethod = EncodingMethod.POSITION,
    dhb_method: DHBMethod = DHBMethod.DOUBLE_REFLECTION,
    drop_padded: bool = True,
    robust_mode: bool = False,
    validate_initial_pose: bool = False,
    return_diagnostics: bool = False,
) -> Dict[str, Any]:
    """
    Reconstruct poses from DHB invariants.

    Parameters:
        linear_motion_invariants: (N, k) linear invariants
        angular_motion_invariants: (N, k) angular invariants
        initial_pose: {'position': (3,), 'quaternion': (4,) wxyz}
        method: 'pos' or 'vel'
        dhb_method: DHBMethod.ORIGINAL (3 inv) or DHBMethod.DOUBLE_REFLECTION (4 inv)
        drop_padded: if True, return positions/quaternions[2:] to match encoder padding
        robust_mode: Enable robustness features (NaN handling, normalization)
        validate_initial_pose: Check initial pose validity before decoding
        return_diagnostics: Include decoding diagnostics in output

    Returns:
        dict with 'positions' (N,3), 'quaternions' (N,4) wxyz,
        and optionally 'diagnostics' if return_diagnostics=True.
    """
    diagnostics = DecodingDiagnostics() if return_diagnostics else None

    # Validate initial pose if requested
    if validate_initial_pose or robust_mode:
        if not _validate_initial_pose(initial_pose):
            if diagnostics:
                diagnostics.initial_pose_valid = False
            if robust_mode:
                warnings.warn("Invalid initial pose detected. Using identity pose.")
                initial_pose = {
                    "position": np.zeros(3),
                    "quaternion": np.array([1.0, 0.0, 0.0, 0.0])
                }
            else:
                raise ValueError("Invalid initial pose: position or quaternion is malformed or contains NaN/Inf")
    linear_inv = np.asarray(linear_motion_invariants, dtype=np.float64)
    angular_inv = np.asarray(angular_motion_invariants, dtype=np.float64)
    num_samples = linear_inv.shape[0]
    assert angular_inv.shape[0] == num_samples

    if dhb_method == DHBMethod.ORIGINAL:
        linear_magnitude = linear_inv[:, 0]
        linear_angle_y = linear_inv[:, 1]
        linear_angle_x = linear_inv[:, 2]
        angular_magnitude = angular_inv[:, 0]
        angular_angle_y = angular_inv[:, 1]
        angular_angle_x = angular_inv[:, 2]
    else:
        linear_magnitude = linear_inv[:, 0]
        linear_angles = linear_inv[:, 1:4]
        angular_magnitude = angular_inv[:, 0]
        angular_angles = angular_inv[:, 1:4]

    position_mode = method == EncodingMethod.POSITION
    init_pos = np.asarray(initial_pose["position"]).reshape(3)
    init_quat = np.asarray(initial_pose["quaternion"]).reshape(4)

    trajectory_position = np.zeros((num_samples, 3))
    trajectory_quaternions = np.zeros((num_samples, 4))
    trajectory_quaternions[0] = init_quat

    linear_frame = np.eye(4)
    linear_frame[:3, 3] = init_pos
    linear_frame[3, 3] = float(position_mode)

    angular_frame = np.eye(4)
    angular_rot_temp = geom.quat_to_rot(init_quat)

    prev_pos = init_pos.copy()

    for i in range(num_samples):
        if position_mode:
            trajectory_position[i] = linear_frame[:3, 3]

        if dhb_method == DHBMethod.ORIGINAL:
            R_lin = geom.y_rot(linear_angle_y[i]) @ geom.x_rot(linear_angle_x[i])
        else:
            R_lin = geom.euler_to_rot(linear_angles[i])

        trans = np.array([linear_magnitude[i], 0.0, 0.0])
        T = np.eye(4)
        T[:3, :3] = R_lin
        T[:3, 3] = trans
        T[3, 3] = float(position_mode)
        linear_frame = linear_frame @ T

        if not position_mode:
            trajectory_position[i] = linear_frame[:3, 3]

        # Handle NaN in robust mode
        if robust_mode:
            if not np.all(np.isfinite(trajectory_position[i])):
                if diagnostics:
                    diagnostics.num_nan_positions += 1
                trajectory_position[i] = prev_pos.copy()
            else:
                # Track step size for diagnostics
                step_size = np.linalg.norm(trajectory_position[i] - prev_pos)
                if diagnostics:
                    diagnostics.max_step_size = max(diagnostics.max_step_size, step_size)
                    diagnostics.trajectory_length += step_size
                prev_pos = trajectory_position[i].copy()

        rvec = angular_frame[:3, :3] @ np.array([angular_magnitude[i], 0.0, 0.0])

        if dhb_method == DHBMethod.ORIGINAL:
            R_ang = geom.y_rot(angular_angle_y[i]) @ geom.x_rot(angular_angle_x[i])
        else:
            R_ang = geom.euler_to_rot(angular_angles[i])

        angular_rot_temp = angular_rot_temp @ geom.axis_angle_to_rot(rvec).T
        if i < num_samples - 1:
            quat = geom.rot_to_quat(angular_rot_temp)
            # Normalize quaternion in robust mode
            if robust_mode:
                quat = _normalize_quaternion(quat)
                if not np.all(np.isfinite(quat)):
                    if diagnostics:
                        diagnostics.num_nan_quaternions += 1
                    quat = trajectory_quaternions[i].copy()  # Use previous
            trajectory_quaternions[i + 1] = quat
        angular_frame[:3, :3] = angular_frame[:3, :3] @ R_ang

    if drop_padded:
        trajectory_position = trajectory_position[2:]
        trajectory_quaternions = trajectory_quaternions[2:]

    result = {
        "positions": trajectory_position,
        "quaternions": trajectory_quaternions,
    }

    if return_diagnostics and diagnostics is not None:
        result["diagnostics"] = diagnostics

    return result

detect_reversals

detect_reversals(positions, threshold=-0.5)

Detect indices where motion direction reverses significantly.

A reversal is detected when consecutive tangent vectors have a dot product below the threshold (e.g., -0.5 means > 120° turn).

Parameters:

Name Type Description Default
positions ndarray

(N, 3) position trajectory.

required
threshold float

Dot product threshold for reversal detection. -1.0 = exact 180° reversal only -0.5 = 120° or larger turn 0.0 = 90° or larger turn

-0.5

Returns:

Type Description
Tuple[List[int], float]

Tuple of (reversal_indices, min_dot_product).

Source code in src/dhb_xr/preprocessing/diagnostics.py
def detect_reversals(
    positions: np.ndarray,
    threshold: float = -0.5,
) -> Tuple[List[int], float]:
    """
    Detect indices where motion direction reverses significantly.

    A reversal is detected when consecutive tangent vectors have a
    dot product below the threshold (e.g., -0.5 means > 120° turn).

    Args:
        positions: (N, 3) position trajectory.
        threshold: Dot product threshold for reversal detection.
            -1.0 = exact 180° reversal only
            -0.5 = 120° or larger turn
            0.0 = 90° or larger turn

    Returns:
        Tuple of (reversal_indices, min_dot_product).
    """
    positions = np.asarray(positions)
    if len(positions) < 3:
        return [], 1.0

    # Compute tangent vectors
    tangents = np.diff(positions, axis=0)
    norms = np.linalg.norm(tangents, axis=1, keepdims=True)

    # Normalize tangents (avoid division by zero)
    eps = 1e-10
    tangents_normalized = tangents / (norms + eps)

    # Compute consecutive dot products
    reversal_indices = []
    min_dot = 1.0

    for i in range(len(tangents_normalized) - 1):
        # Skip if either tangent is near-zero
        if norms[i] < eps or norms[i + 1] < eps:
            continue

        dot = np.dot(tangents_normalized[i], tangents_normalized[i + 1])
        min_dot = min(min_dot, dot)

        if dot < threshold:
            reversal_indices.append(i + 1)  # Index in original positions

    return reversal_indices, min_dot

detect_zero_motion

detect_zero_motion(positions, threshold=1e-06, min_segment_length=1)

Detect contiguous segments with zero or near-zero motion.

Parameters:

Name Type Description Default
positions ndarray

(N, 3) position trajectory.

required
threshold float

Distance threshold for zero motion.

1e-06
min_segment_length int

Minimum number of consecutive frames to report.

1

Returns:

Type Description
List[Tuple[int, int]]

List of (start_index, end_index) tuples for zero-motion segments.

Source code in src/dhb_xr/preprocessing/diagnostics.py
def detect_zero_motion(
    positions: np.ndarray,
    threshold: float = 1e-6,
    min_segment_length: int = 1,
) -> List[Tuple[int, int]]:
    """
    Detect contiguous segments with zero or near-zero motion.

    Args:
        positions: (N, 3) position trajectory.
        threshold: Distance threshold for zero motion.
        min_segment_length: Minimum number of consecutive frames to report.

    Returns:
        List of (start_index, end_index) tuples for zero-motion segments.
    """
    positions = np.asarray(positions)
    if len(positions) < 2:
        return []

    # Compute step distances
    steps = np.diff(positions, axis=0)
    distances = np.linalg.norm(steps, axis=1)

    # Find zero-motion frames
    is_zero = distances < threshold

    # Group into contiguous segments
    segments = []
    in_segment = False
    segment_start = 0

    for i, zero in enumerate(is_zero):
        if zero and not in_segment:
            in_segment = True
            segment_start = i
        elif not zero and in_segment:
            in_segment = False
            segment_length = i - segment_start
            if segment_length >= min_segment_length:
                segments.append((segment_start, i))

    # Handle segment at end
    if in_segment:
        segment_length = len(is_zero) - segment_start
        if segment_length >= min_segment_length:
            segments.append((segment_start, len(is_zero)))

    return segments

encode_dhb_dr

encode_dhb_dr(
    positions,
    quaternions,
    init_pose=None,
    method="pos",
    use_default_initial_frames=True,
    dhb_method=DHBMethod.DOUBLE_REFLECTION,
    robust_mode=False,
    reversal_threshold=-0.9,
    zero_motion_threshold=1e-06,
    validate_frames=False,
    return_diagnostics=False,
)

Compute DHB invariants (original or double-reflection).

Parameters:

Name Type Description Default
positions ndarray

(N, 3) position trajectory

required
quaternions ndarray

(N, 4) wxyz quaternion trajectory

required
init_pose Optional[Dict[str, ndarray]]

optional {'position': (3,), 'quaternion': (4,) wxyz}

None
method str

EncodingMethod for position or velocity-based encoding

'pos'
use_default_initial_frames bool

if True, pad and use identity-like frames

True
dhb_method DHBMethod

DHBMethod.ORIGINAL (3 inv) or DHBMethod.DOUBLE_REFLECTION (4 inv)

DOUBLE_REFLECTION
robust_mode bool

Enable robustness features (reversal detection, zero-motion handling)

False
reversal_threshold float

Dot product threshold for 180° detection (default -0.9)

-0.9
zero_motion_threshold float

Threshold for zero-motion detection

1e-06
validate_frames bool

Check frame orthogonality at each step

False
return_diagnostics bool

Include encoding diagnostics in output

False

Returns:

Type Description
Dict[str, Any]

dict with linear_motion_invariants, angular_motion_invariants,

Dict[str, Any]

linear_frame_initial, angular_frame_initial, initial_pose,

Dict[str, Any]

and optionally 'diagnostics' if return_diagnostics=True.

Source code in src/dhb_xr/encoder/dhb_dr.py
def encode_dhb_dr(
    positions: np.ndarray,
    quaternions: np.ndarray,
    init_pose: Optional[Dict[str, np.ndarray]] = None,
    method: str = "pos",
    use_default_initial_frames: bool = True,
    dhb_method: DHBMethod = DHBMethod.DOUBLE_REFLECTION,
    robust_mode: bool = False,
    reversal_threshold: float = -0.9,
    zero_motion_threshold: float = 1e-6,
    validate_frames: bool = False,
    return_diagnostics: bool = False,
) -> Dict[str, Any]:
    """
    Compute DHB invariants (original or double-reflection).

    Parameters:
        positions: (N, 3) position trajectory
        quaternions: (N, 4) wxyz quaternion trajectory
        init_pose: optional {'position': (3,), 'quaternion': (4,) wxyz}
        method: EncodingMethod for position or velocity-based encoding
        use_default_initial_frames: if True, pad and use identity-like frames
        dhb_method: DHBMethod.ORIGINAL (3 inv) or DHBMethod.DOUBLE_REFLECTION (4 inv)
        robust_mode: Enable robustness features (reversal detection, zero-motion handling)
        reversal_threshold: Dot product threshold for 180° detection (default -0.9)
        zero_motion_threshold: Threshold for zero-motion detection
        validate_frames: Check frame orthogonality at each step
        return_diagnostics: Include encoding diagnostics in output

    Returns:
        dict with linear_motion_invariants, angular_motion_invariants,
        linear_frame_initial, angular_frame_initial, initial_pose,
        and optionally 'diagnostics' if return_diagnostics=True.
    """
    positions = np.asarray(positions, dtype=np.float64)
    quaternions = np.asarray(quaternions, dtype=np.float64)
    n = positions.shape[0]
    assert n > 2 and quaternions.shape[0] == n, "Need >2 samples and matching lengths"

    if use_default_initial_frames:
        positions = np.vstack((positions[0], positions[0], positions))
        quaternions = np.vstack((quaternions[0], quaternions[0], quaternions))
    positions = np.vstack((positions, positions[-1], positions[-1], positions[-1]))
    quaternions = np.vstack((quaternions, quaternions[-1], quaternions[-1], quaternions[-1]))

    num_samples = positions.shape[0]
    initial_pose = init_pose or {
        "position": positions[0].copy(),
        "quaternion": quaternions[0].copy(),
    }
    initial_pose["quaternion"] = np.asarray(initial_pose["quaternion"]).reshape(4)
    initial_pose["position"] = np.asarray(initial_pose["position"]).reshape(3)

    position_diff = np.diff(positions, axis=0)
    rotation_diff = np.zeros((num_samples - 1, 3))
    for i in range(1, num_samples):
        R_prev = geom.quat_to_rot(quaternions[i - 1]).T
        R_curr = geom.quat_to_rot(quaternions[i]).T
        R_rel = R_curr @ R_prev.T
        rotation_diff[i - 1] = geom.rot_to_axis_angle(R_rel)

    num_steps = position_diff.shape[0]
    frames = _compute_initial_frames(
        position_diff, rotation_diff, initial_pose, method, use_default_initial_frames
    )

    k = 4 if dhb_method == DHBMethod.DOUBLE_REFLECTION else 3
    linear_inv = np.zeros((num_steps - 2, k))
    angular_inv = np.zeros((num_steps - 2, k))

    # Initialize diagnostics if needed
    diagnostics = EncodingDiagnostics() if (robust_mode or return_diagnostics) else None

    lx = frames["linear_frame_x"].copy()
    lx2 = frames["linear_frame_x2"].copy()
    ly = frames["linear_frame_y"].copy()
    ax = frames["angular_frame_x"].copy()
    ax2 = frames["angular_frame_x2"].copy()
    ay = frames["angular_frame_y"].copy()

    for i in range(num_steps - 2):
        # Handle zero-motion in robust mode
        pos_is_zero = _is_zero_motion(position_diff[i], zero_motion_threshold) if robust_mode else False
        rot_is_zero = _is_zero_motion(rotation_diff[i], zero_motion_threshold) if robust_mode else False

        if robust_mode and pos_is_zero and diagnostics is not None:
            diagnostics.num_zero_motion_frames += 1
            diagnostics.zero_motion_indices.append(i)

        # Compute next frame axes
        if robust_mode and pos_is_zero:
            # For zero motion, preserve previous frame direction
            lx3 = lx2.copy()
        else:
            lx3 = _compute_frame_axis_x(position_diff[i + 2], lx2)

        if robust_mode and rot_is_zero:
            ax3 = ax2.copy()
        else:
            ax3 = _compute_frame_axis_x(rotation_diff[i + 2], ax2)

        if dhb_method == DHBMethod.DOUBLE_REFLECTION:
            ly2, linear_R_rel, lin_fallback = _double_reflection_step(
                lx, ly, lx2, position_diff[i], position_diff[i + 1],
                robust_mode=robust_mode, reversal_threshold=reversal_threshold
            )
            ay2, angular_R_rel, ang_fallback = _double_reflection_step(
                ax, ay, ax2, rotation_diff[i], rotation_diff[i + 1],
                robust_mode=robust_mode, reversal_threshold=reversal_threshold
            )

            # Track reversals in diagnostics
            if diagnostics is not None and lin_fallback:
                if _detect_reversal(position_diff[i], position_diff[i + 1], reversal_threshold):
                    diagnostics.num_reversals_detected += 1
                    diagnostics.reversal_indices.append(i)

            linear_inv[i, 0] = np.dot(lx, position_diff[i])
            angular_inv[i, 0] = np.dot(ax, rotation_diff[i])
            linear_inv[i, 1:4] = geom.rot_to_euler(linear_R_rel)
            angular_inv[i, 1:4] = geom.rot_to_euler(angular_R_rel)
        else:
            ly2 = _compute_frame_axis_y(lx2, lx3, ly)
            ay2 = _compute_frame_axis_y(ax2, ax3, ay)
            if np.dot(ly, ly2) < 0:
                ly2 = -ly2
            if np.dot(ay, ay2) < 0:
                ay2 = -ay2
            linear_inv[i] = _compute_invariants_original(
                position_diff[i], lx, lx2, ly, ly2
            )
            angular_inv[i] = _compute_invariants_original(
                rotation_diff[i], ax, ax2, ay, ay2
            )
            # No fallback tracking for ORIGINAL method
            lin_fallback = False

        # Frame validation in robust mode
        if validate_frames and diagnostics is not None:
            if not _validate_frame(lx2, ly2):
                diagnostics.frame_validation_failures += 1
            if not _validate_frame(ax2, ay2):
                diagnostics.frame_validation_failures += 1

        lx, lx2, ly = lx2, lx3, ly2
        ax, ax2, ay = ax2, ax3, ay2

    result = {
        "linear_motion_invariants": linear_inv,
        "angular_motion_invariants": angular_inv,
        "linear_frame_initial": frames["linear_frame_initial"],
        "angular_frame_initial": frames["angular_frame_initial"],
        "initial_pose": initial_pose,
    }

    if return_diagnostics and diagnostics is not None:
        result["diagnostics"] = diagnostics

    return result

plot_invariants

plot_invariants(linear_inv, angular_inv, ax=None, title='DHB invariants')

Plot linear and angular invariant time series.

Source code in src/dhb_xr/visualization/plot.py
def plot_invariants(
    linear_inv: np.ndarray,
    angular_inv: np.ndarray,
    ax: Optional[Any] = None,
    title: str = "DHB invariants",
) -> Any:
    """Plot linear and angular invariant time series."""
    if not HAS_MPL:
        raise ImportError("matplotlib required for plot_invariants")
    n = linear_inv.shape[0]
    k_lin, k_ang = linear_inv.shape[1], angular_inv.shape[1]
    if ax is None:
        fig, axes = plt.subplots(2, 1, sharex=True)
        ax_lin, ax_ang = axes[0], axes[1]
    else:
        ax_lin, ax_ang = ax
    t = np.arange(n)
    for j in range(k_lin):
        ax_lin.plot(t, linear_inv[:, j], label=f"lin_{j}")
    ax_lin.set_ylabel("linear")
    ax_lin.legend(loc="right", fontsize=8)
    ax_lin.set_title(title)
    for j in range(k_ang):
        ax_ang.plot(t, angular_inv[:, j], label=f"ang_{j}")
    ax_ang.set_ylabel("angular")
    ax_ang.legend(loc="right", fontsize=8)
    ax_ang.set_xlabel("step")
    return (ax_lin, ax_ang)

plot_se3_trajectories

plot_se3_trajectories(
    trajectories,
    ax=None,
    show_orientation=True,
    vis_type="cube",
    box_size_scale=0.05,
    num_frames=6,
    title="SE(3) Trajectories",
    show_legend=True,
)

Plot multiple SE(3) trajectories with orientation visualization.

Parameters:

Name Type Description Default
trajectories Union[List[Dict], Dict[str, Dict]]

List of dicts with 'positions' and 'quaternions', or dict of name -> trajectory

required
ax Optional[Any]

Matplotlib 3D axes

None
show_orientation bool

Show orientation cubes/frames

True
vis_type str

"cube" or "arrow"

'cube'
box_size_scale float

Size of cubes

0.05
num_frames int

Number of orientation samples per trajectory

6
title str

Plot title

'SE(3) Trajectories'
show_legend bool

Show legend

True

Returns:

Type Description
Any

Matplotlib axes

Source code in src/dhb_xr/visualization/plot.py
def plot_se3_trajectories(
    trajectories: Union[List[Dict], Dict[str, Dict]],
    ax: Optional[Any] = None,
    show_orientation: bool = True,
    vis_type: str = "cube",
    box_size_scale: float = 0.05,
    num_frames: int = 6,
    title: str = "SE(3) Trajectories",
    show_legend: bool = True,
) -> Any:
    """
    Plot multiple SE(3) trajectories with orientation visualization.

    Args:
        trajectories: List of dicts with 'positions' and 'quaternions', or dict of name -> trajectory
        ax: Matplotlib 3D axes
        show_orientation: Show orientation cubes/frames
        vis_type: "cube" or "arrow"
        box_size_scale: Size of cubes
        num_frames: Number of orientation samples per trajectory
        title: Plot title
        show_legend: Show legend

    Returns:
        Matplotlib axes
    """
    if not HAS_MPL:
        raise ImportError("matplotlib required for plot_se3_trajectories")

    # Normalize input to dict
    if isinstance(trajectories, list):
        trajectories = {f"traj_{i}": t for i, t in enumerate(trajectories)}

    if ax is None:
        fig = plt.figure(figsize=(10, 8))
        ax = fig.add_subplot(111, projection="3d")

    colors = ["blue", "green", "red", "orange", "purple", "cyan", "magenta", "brown"]
    box_size = (box_size_scale, box_size_scale, box_size_scale)

    for idx, (name, traj) in enumerate(trajectories.items()):
        positions = np.asarray(traj["positions"])
        quaternions = np.asarray(traj.get("quaternions"))
        color = colors[idx % len(colors)]

        ax.plot(positions[:, 0], positions[:, 1], positions[:, 2], color=color, label=name, linewidth=2)

        if show_orientation and quaternions is not None:
            from dhb_xr.core import geometry as geom
            n = len(positions)
            indices = np.linspace(0, n - 1, num_frames, dtype=int)
            for i in indices:
                R = geom.quat_to_rot(quaternions[i])
                p = positions[i]
                if vis_type == "cube":
                    draw_box(ax, p, R, size=box_size, color=color, alpha=0.5)
                else:
                    draw_frame(ax, p, R, length=0.05)

    ax.set_xlabel("x")
    ax.set_ylabel("y")
    ax.set_zlabel("z")
    ax.set_title(title)
    if show_legend:
        ax.legend()

    return ax

plot_se3_trajectory

plot_se3_trajectory(
    positions,
    quaternions=None,
    ax=None,
    title="SE(3) trajectory",
    show_orientation=False,
    vis_type="arrow",
    box_size=(0.03, 0.03, 0.03),
    color="b",
    alpha=0.7,
    num_frames=8,
    label=None,
)

Plot 3D position trajectory with optional orientation visualization.

Parameters:

Name Type Description Default
positions ndarray

(N, 3) position array

required
quaternions Optional[ndarray]

(N, 4) quaternion array (wxyz), optional

None
ax Optional[Any]

Matplotlib 3D axes, created if None

None
title str

Plot title

'SE(3) trajectory'
show_orientation bool

If True, show orientation frames or cubes

False
vis_type str

"arrow" for coordinate frames, "cube" for oriented boxes

'arrow'
box_size tuple

Size of cubes (if vis_type="cube")

(0.03, 0.03, 0.03)
color str

Trajectory line color

'b'
alpha float

Transparency for cubes

0.7
num_frames int

Number of orientation samples to show along trajectory

8
label Optional[str]

Legend label for trajectory

None

Returns:

Type Description
Any

Matplotlib axes

Source code in src/dhb_xr/visualization/plot.py
def plot_se3_trajectory(
    positions: np.ndarray,
    quaternions: Optional[np.ndarray] = None,
    ax: Optional[Any] = None,
    title: str = "SE(3) trajectory",
    show_orientation: bool = False,
    vis_type: str = "arrow",
    box_size: tuple = (0.03, 0.03, 0.03),
    color: str = "b",
    alpha: float = 0.7,
    num_frames: int = 8,
    label: Optional[str] = None,
) -> Any:
    """
    Plot 3D position trajectory with optional orientation visualization.

    Args:
        positions: (N, 3) position array
        quaternions: (N, 4) quaternion array (wxyz), optional
        ax: Matplotlib 3D axes, created if None
        title: Plot title
        show_orientation: If True, show orientation frames or cubes
        vis_type: "arrow" for coordinate frames, "cube" for oriented boxes
        box_size: Size of cubes (if vis_type="cube")
        color: Trajectory line color
        alpha: Transparency for cubes
        num_frames: Number of orientation samples to show along trajectory
        label: Legend label for trajectory

    Returns:
        Matplotlib axes
    """
    if not HAS_MPL:
        raise ImportError("matplotlib required for plot_se3_trajectory")
    positions = np.asarray(positions)
    if ax is None:
        fig = plt.figure()
        ax = fig.add_subplot(111, projection="3d")
    ax.plot(positions[:, 0], positions[:, 1], positions[:, 2], "-", color=color, label=label or "position", alpha=0.8)
    ax.set_xlabel("x")
    ax.set_ylabel("y")
    ax.set_zlabel("z")
    ax.set_title(title)

    if show_orientation and quaternions is not None:
        from dhb_xr.core import geometry as geom
        n = len(positions)
        indices = np.linspace(0, n - 1, num_frames, dtype=int)
        for i in indices:
            R = geom.quat_to_rot(quaternions[i])
            p = positions[i]
            if vis_type == "cube":
                draw_box(ax, p, R, size=box_size, color=color, alpha=alpha)
            else:  # arrow
                draw_frame(ax, p, R, length=0.05)
    return ax

License

MIT License.

Copyright (c) 2026 Andy Park andypark.purdue@gmail.com