modifiers
Classes:
| Name | Description |
|---|---|
DelayedObservation |
A modifier used to return a stochastically delayed (stale) version of |
DelayedObservationCfg |
Configuration parameters for a delayed observation modifier. |
DelayedObservation
DelayedObservation(cfg: DelayedObservationCfg, data_dim: tuple[int, ...], device: str)
Bases: ModifierBase
A modifier used to return a stochastically delayed (stale) version of an observation term. This can also be used to model multi-rate observations for non-sensor terms, e.g., pure MDP terms or proprioceptive terms.
This modifier takes an existing observation term/function, pushes each new batched observation into a DelayBuffer, and returns an older sample according to a per-environment integer time-lag. Lags are drawn uniformly from [min_lag, max_lag], with an optional probability to hold the previous lag (to mimic repeated frames). With 'update_period>0' (multi-rate), new lags are applied only on refresh ticks, which occur every update_period policy steps. Between refreshes the realised lag can increase at most by +1 (frame hold). This process is causal: the lag for each environment can only increase by 1 each step, ensuring that the returned observation is never older than the previous step's lagged observation.
Shapes are preserved: the returned tensor has the exact shape of the wrapped
term ([num_envs, *obs_shape]).
Configuration:
min_lag (int): Minimum time-lag (in steps) to sample. Default 0.
max_lag (int): Maximum time-lag (in steps) to sample. Default 3.
per_env (bool): If True, sample a different lag for each environment.
If False, use the same lag for all envs. Default True.
hold_prob (float): Probability in [0, 1] of holding the previous lag
instead of sampling a new one. Default 0.0 (always sample new).
update_period (int): If > 0, apply new lags every update_period
policy steps (models a lower sensor cadence). Between updates, the
lag can increase by at most +1 each step (frame hold). If 0 (default),
update every step.
per_env_phase (bool): Only relevant if update_period > 0. If True,
each environment has a different random phase offset for lag updates.
If False, all envs update their lag simultaneously. Default True.
Example: .. code-block:: python
# create a height_scan observation using the delayed observation modifier
from isaaclab.utils.modifiers import DelayedObservation
height_scan = ObservationTermCfg(
func=mdp.height_scan,
params={"sensor_cfg": SceneEntityCfg("height_scanner")},
noise=Unoise(n_min=-0.1, n_max=0.1),
clip=(-1.0, 1.0),
modifiers=[
modifiers.DelayedObservationCfg(
min_lag=0,
max_lag=3,
per_env=True,
hold_prob=0.66,
update_period=3,
per_env_phase=True,
)
],
)
Initialize the DelayedObservation modifier.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
DelayedObservationCfg
|
Configuration parameters. |
required |
Methods:
| Name | Description |
|---|---|
reset |
Resets the delay buffer and internal state. Since the DelayBuffer |
__call__ |
Add the current data to the delay buffer and return a stale sample |
reset
reset(env_ids: Sequence[int] | None = None)
Resets the delay buffer and internal state. Since the DelayBuffer does not support partial resets, if env_ids is not None, only the previous lags for those envs are reset to zero, forcing the latest observation to be returned on the next call preventing observations from before the reset being returned.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
Sequence[int] | None
|
The environment ids. Defaults to None, in which case all environments are considered. |
None
|
__call__
__call__(data: Tensor) -> torch.Tensor
Add the current data to the delay buffer and return a stale sample according to the current lag for each environment.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
Tensor
|
The data to apply delay to. |
required |
Returns:
| Type | Description |
|---|---|
Tensor
|
Delayed data. Shape is the same as data. |
DelayedObservationCfg
Bases: ModifierCfg
Configuration parameters for a delayed observation modifier.
For more information, please check the :class:DelayedObservation class.
Attributes:
| Name | Type | Description |
|---|---|---|
func |
type[DelayedObservation]
|
The delayed observation function to be called for applying the delay. |
min_lag |
int
|
The minimum lag (in number of policy steps) to be applied to the observations. Defaults to 0. |
max_lag |
int
|
The maximum lag (in number of policy steps) to be applied to the observations. |
per_env |
bool
|
Whether to use a separate lag for each environment. |
hold_prob |
float
|
The probability of holding the previous lag when updating the lag. |
update_period |
int
|
The period (in number of policy steps) at which the lag is updated. |
per_env_phase |
bool
|
Whether to use a separate phase for each environment when updating the lag. |
func
class-attribute
instance-attribute
func: type[DelayedObservation] = DelayedObservation
The delayed observation function to be called for applying the delay.
min_lag
class-attribute
instance-attribute
min_lag: int = 0
The minimum lag (in number of policy steps) to be applied to the observations. Defaults to 0.
max_lag
class-attribute
instance-attribute
max_lag: int = 3
The maximum lag (in number of policy steps) to be applied to the observations.
This value must be greater than or equal to :attr:min_lag.
per_env
class-attribute
instance-attribute
per_env: bool = True
Whether to use a separate lag for each environment.
hold_prob
class-attribute
instance-attribute
hold_prob: float = 0.0
The probability of holding the previous lag when updating the lag.
update_period
class-attribute
instance-attribute
update_period: int = 1
The period (in number of policy steps) at which the lag is updated.
If set to 0, the lag is sampled once at the beginning and remains constant throughout the simulation.
If set to a positive integer, the lag is updated every update_period policy steps. Defaults to 1.
This value must be less than or equal to :attr:max_lag if it is greater than 0.
per_env_phase
class-attribute
instance-attribute
per_env_phase: bool = True
Whether to use a separate phase for each environment when updating the lag.
If set to True, each environment will have its own phase when updating the lag. If set to False, all environments will share the same phase. Defaults to True.