Task System#
1.Architecture and Philosophy#
In RoboVerse, a task is a wrapper built on top of a Handler and exposes Gym-style APIs (step
, reset
, etc.).
Simulation contents (robots, objects, scene, physics params) live in a
ScenarioCfg
and are instantiated by a Handler.Task logic (reward, observation, termination, etc.) is layered on top via wrappers.
This enforces clean separation between simulation, task, and algorithm.
A task is created with:
scenario: a
ScenarioCfg
describing the simulation.device: execution device (e.g., CPU/GPU).
When defining a new task, inherit from BaseTaskEnv
and implement methods like _observation
, _reward
, _terminated
, _time_out
, _observation_space
, _action_space
, and _extra_spec
.
Tasks are managed by a registry system, where each task is bound to a unique string ID (e.g., "example.my_task"
). This design provides:
One-click switching: run a different task by simply changing a string in configs or CLI args.
Unified interface: all tasks share the same API, regardless of simulator or logic.
2. Task Instantiation Workflow#
Typical instantiation of a task for training looks like:
task_cls = get_task_class(args.task)
# Get default scenario from task class and update with overrides
scenario = task_cls.scenario.update(
robots=[args.robot],
simulator=args.sim,
num_envs=args.num_envs,
headless=args.headless,
cameras=[],
)
# Create task env via registry
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
env = task_cls(scenario=scenario, device=device)
Key points:
get_task_class(name)
fetches the task class by string identifier from registry.Each task class provides a default scenario config (
task_cls.scenario
) with standard robot, object, and asset definitions.Users can update this config (simulator choice, camera list, env count, etc.) via
scenario.update()
.The updated
ScenarioCfg
is then passed into the task class to instantiate a working environment.
This workflow ensures tasks are:
Customizable (override any part of the scenario at runtime).
Consistent (task class always defines a sane default).
Simulator‑agnostic (only the Handler changes underneath).
3. Task Instantiation Workflow#
3.1 Via Task Registry#
"""Train PPO for a reaching task using RLTaskEnv."""
from metasim.task.registry import get_task_class
import torch
task_cls = get_task_class(args.task) # e.g., "example.my_task"
# Start from the class-provided default scenario and override as needed
scenario = task_cls.scenario.update(
robots=[args.robot],
simulator=args.sim,
num_envs=args.num_envs,
headless=args.headless,
cameras=[],
)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
env = task_cls(scenario=scenario, device=device)
3.2 Via make_vec
#
make_vec
provides a standardized helper that wraps task instantiation in a Gym‑compatible API. It is the recommended entry point for creating environments.
from metasim.task.gym_registration import make_vec
env = make_vec(
env_id, # e.g., "example.my_task"
num_envs=args.num_envs,
robots=[args.robot],
simulator=args.sim,
headless=args.headless,
cameras=[camera] if args.save_video else [],
device=args.device,
)
Key points:
Each task class provides a default scenario (
task_cls.scenario
) with standard robots/objects/assets.Use
scenario.update(...)
to override simulator, cameras, env count, etc.The final
ScenarioCfg
is passed into the task class or wrapped viamake_vec
for Gym API compatibility.
4. Task Registration & Auto‑Import#
4.1 Auto‑import paths#
Task modules under the following directories are auto‑imported and registered at runtime:
metasim/example/example_pack/tasks
roboverse_pack/tasks
For new project tasks, place modules under
roboverse_pack/tasks
.
4.2 How to register a task#
from metasim.task.base import BaseTaskEnv
from metasim.task.gym_registration import register_task
from metasim.scenario.scenario import ScenarioCfg
@register_task("example.my_task")
class MyExampleTask(BaseTaskEnv):
scenario = ScenarioCfg(robots=["franka"], simulator="mujoco", cameras=[])
def _observation(self, state): ...
def _privileged_observation(self, state): ...
def _reward(self, state, action, next_state=None): ...
def _terminated(self, state): ...
def _time_out(self, step_count): ...
def _observation_space(self): ...
def _action_space(self): ...
def _extra_spec(self): ...
def step(self,actions): ...
def reset(self,states,env_ids): ...
5. Migration New Task#
5.1 Direct Integration (Quick)#
Copy external task code into
roboverse_learn/
.Replace simulator‑specific APIs with
Handler
equivalents.Convert observations to
TensorState
viaget_state()
.Move sim details (assets, timestep, decimation) into
ScenarioCfg
.
5.2 Structured Wrapper Integration#
Subclass
BaseTaskWrapper
.Implement
_reward()
,_observation()
,_terminated()
.Use hooks
pre_sim_step
,post_sim_step
,reset_callback
.Reuse
Handler
+ScenarioCfg
separation.
6.BaseTaskEnv & RLTaskEnv#
BaseTaskEnv (core behavior)#
Default observation: returns the simulator’s TensorState directly via
_observation(env_states)
(structured tensor, not flattened).Initialization: accepts a
ScenarioCfg
or a pre‑builtBaseSimHandler
. Internally resolves the handler and callslaunch()
.Callbacks:
pre_physics_step_callback
,post_physics_step_callback
,reset_callback
,close_callback
.Episode control: per‑env step counter
self._episode_steps
; timeout handled by_time_out
(default based onmax_episode_steps
).Step flow:
pre_physics_step_callback(actions)
handler.set_dof_targets(actions)
handler.simulate()
env_states = handler.get_states()
post_physics_step_callback(env_states)
Compute
reward
,terminated
,timeout
and return(obs, reward, terminated, timeout, info)
withprivileged_observation
.
Reset flow: can use external
states
or fall back to_initial_states
. Callshandler.set_states(...)
, fetchesenv_states
, and resets episode counters.Flexible override: You can also override
step()
andreset()
functions directly, bypassing the callback system entirely.
RLTaskEnv (RL‑friendly extension)#
Observation shape: flattens
TensorState
into a 1D tensor and buildsobservation_space = Box(num_obs,)
.Action handling: derives
action_space
fromrobot.joint_limits
andhandler.get_joint_names(...)
. Instep()
, actions are clamped before being passed toset_dof_targets
.Auto device: defaults to CUDA if available.
Auto reset on done: after each step, envs flagged by
terminated | time_out
are reset in-place, and their observations refreshed.Initial state acceleration: uses
list_state_to_tensor(handler, _get_initial_states())
to convert list states to tensor states for faster resets.Info payload: includes
privileged_observation
,episode_steps
, and cached raw observationsobservations.raw.obs
.Utilities:
unnormalise_action(a)
maps actions from[-1,1]
to joint physical ranges.
Differences at a Glance#
Aspect |
BaseTaskEnv |
RLTaskEnv |
---|---|---|
Observation return |
TensorState (not flattened) |
Flattened tensor (1D) |
Auto reset |
No |
Yes (on done/timeout) |
Space construction |
Decided by subclass or upper layer |
Auto‑derived obs/action spaces |
Action clamping |
Decided by subclass or upper layer |
Built‑in clamping to joint limits |
Initial state format |
list or tensor |
Auto conversion list → tensor |
Device selection |
Passed by user |
Auto‑select CUDA/CPU |
7. Summary#
Tasks = glue layer between
ScenarioCfg/Handler
(simulation) and learning algorithmsRegistry system (
get_task_class
) makes tasks discoverable by string names.Default
ScenarioCfg
in each task class ensures reproducibility and easy overrides.Two migration methods (Quick vs. Structured) cover integration.