Isaac Lab 강화학습
이 포스팅은 ‘Isaac Lab‘에 대한 내용을 담고 있습니다.
자료 출처: https://isaac-sim.github.io/IsaacLab/main/source/tutorials/index.html
Isaac Lab 강화학습
이제 Isaac Lab에서 강화학습을 수행하는 방법에 대해 살펴봅니다. 강화학습을 하기에 앞서 여러 환경을 한꺼번에 생성하는 방법을 먼저 살펴보겠습니다.
Manager-Based Base Environment
# Copyright (c) 2022-2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""
This script demonstrates how to create a simple environment with a cartpole. It combines the concepts of
scene, action, observation and event managers to create an environment.
"""
"""Launch Isaac Sim Simulator first."""
import argparse
from isaaclab.app import AppLauncher
# add argparse arguments
parser = argparse.ArgumentParser(description="Tutorial on creating a cartpole base environment.")
parser.add_argument("--num_envs", type=int, default=16, help="Number of environments to spawn.")
# append AppLauncher cli args
AppLauncher.add_app_launcher_args(parser)
# parse the arguments
args_cli = parser.parse_args()
# launch omniverse app
app_launcher = AppLauncher(args_cli)
simulation_app = app_launcher.app
"""Rest everything follows."""
import math
import torch
import isaaclab.envs.mdp as mdp
from isaaclab.envs import ManagerBasedEnv, ManagerBasedEnvCfg
from isaaclab.managers import EventTermCfg as EventTerm
from isaaclab.managers import ObservationGroupCfg as ObsGroup
from isaaclab.managers import ObservationTermCfg as ObsTerm
from isaaclab.managers import SceneEntityCfg
from isaaclab.utils import configclass
from isaaclab.scene import InteractiveScene, InteractiveSceneCfg
from isaaclab_assets import CARTPOLE_CFG
from isaaclab.assets import ArticulationCfg, AssetBaseCfg
import isaaclab.sim as sim_utils
@configclass
class CartpoleSceneCfg(InteractiveSceneCfg):
"""Configuration for a cart-pole scene."""
# ground plane
ground = AssetBaseCfg(
prim_path="/World/ground",
spawn=sim_utils.GroundPlaneCfg(size=(100.0, 100.0)),
)
# cartpole
robot: ArticulationCfg = CARTPOLE_CFG.replace(prim_path="{ENV_REGEX_NS}/Robot")
# lights
dome_light = AssetBaseCfg(
prim_path="/World/DomeLight",
spawn=sim_utils.DomeLightCfg(color=(0.9, 0.9, 0.9), intensity=500.0),
)
@configclass
class ActionsCfg:
"""Action specifications for the environment."""
joint_efforts = mdp.JointEffortActionCfg(asset_name="robot", joint_names=["slider_to_cart"], scale=5.0)
@configclass
class ObservationsCfg:
"""Observation specifications for the environment."""
@configclass
class PolicyCfg(ObsGroup):
"""Observations for policy group."""
# observation terms (order preserved)
joint_pos_rel = ObsTerm(func=mdp.joint_pos_rel)
joint_vel_rel = ObsTerm(func=mdp.joint_vel_rel)
def __post_init__(self) -> None:
self.enable_corruption = False
self.concatenate_terms = True
# observation groups
policy: PolicyCfg = PolicyCfg()
@configclass
class EventCfg:
"""Configuration for events."""
# on startup
add_pole_mass = EventTerm(
func=mdp.randomize_rigid_body_mass,
mode="startup",
params={
"asset_cfg": SceneEntityCfg("robot", body_names=["pole"]),
"mass_distribution_params": (0.1, 0.5),
"operation": "add",
},
)
# on reset
reset_cart_position = EventTerm(
func=mdp.reset_joints_by_offset,
mode="reset",
params={
"asset_cfg": SceneEntityCfg("robot", joint_names=["slider_to_cart"]),
"position_range": (-1.0, 1.0),
"velocity_range": (-0.1, 0.1),
},
)
reset_pole_position = EventTerm(
func=mdp.reset_joints_by_offset,
mode="reset",
params={
"asset_cfg": SceneEntityCfg("robot", joint_names=["cart_to_pole"]),
"position_range": (-0.125 * math.pi, 0.125 * math.pi),
"velocity_range": (-0.01 * math.pi, 0.01 * math.pi),
},
)
@configclass
class CartpoleEnvCfg(ManagerBasedEnvCfg):
"""Configuration for the cartpole environment."""
# Scene settings
scene = CartpoleSceneCfg(num_envs=1024, env_spacing=2.5)
# Basic settings
observations = ObservationsCfg()
actions = ActionsCfg()
events = EventCfg()
def __post_init__(self):
"""Post initialization."""
# viewer settings
self.viewer.eye = [4.5, 0.0, 6.0]
self.viewer.lookat = [0.0, 0.0, 2.0]
# step settings
self.decimation = 4 # env step every 4 sim steps: 200Hz / 4 = 50Hz
# simulation settings
self.sim.dt = 0.005 # sim step every 5ms: 200Hz
def main():
"""Main function."""
# parse the arguments
env_cfg = CartpoleEnvCfg()
env_cfg.scene.num_envs = args_cli.num_envs
# setup base environment
env = ManagerBasedEnv(cfg=env_cfg)
# simulate physics
count = 0
while simulation_app.is_running():
with torch.inference_mode():
# reset
if count % 300 == 0:
count = 0
env.reset()
print("-" * 80)
print("[INFO]: Resetting environment...")
# sample random actions
joint_efforts = torch.randn_like(env.action_manager.action)
# step the environment
obs, _ = env.step(joint_efforts)
# print current orientation of pole
print("[Env 0]: Pole joint: ", obs["policy"][0][1].item())
# update counter
count += 1
# close the environment
env.close()
if __name__ == "__main__":
# run the main function
main()
# close sim app
simulation_app.close()
Isaac Lab에서 환경을 작성하는 방법은 크게 2가지가 있습니다. 하나는 Manager-Based 방식이고, 다른 하나는 Direct Workflow 방식입니다. 두 가지 방식으로 환경을 작성하는 방법을 모두 다룰건데, 튜토리얼의 순서대로 Manager-Based 방식 먼저 다루겠습니다. 위 파이썬 코드가 바로 Manager-Based 스타일로 적힌 코드입니다. 튜토리얼 코드랑 조금 차이가 있는데, 바로 아래의 Designing Scene 부분을 튜토리얼에서는 별도 파일로 따로 빼 설명합니다. 그런데 이 코드에서는 그 부분을 가져와 함께 적어두었습니다.
Designing Scene
from isaaclab.scene import InteractiveScene, InteractiveSceneCfg
from isaaclab_assets import CARTPOLE_CFG
from isaaclab.assets import ArticulationCfg, AssetBaseCfg
import isaaclab.sim as sim_utils
@configclass
class CartpoleSceneCfg(InteractiveSceneCfg):
"""Configuration for a cart-pole scene."""
# ground plane
ground = AssetBaseCfg(
prim_path="/World/ground",
spawn=sim_utils.GroundPlaneCfg(size=(100.0, 100.0)),
)
# cartpole
robot: ArticulationCfg = CARTPOLE_CFG.replace(prim_path="{ENV_REGEX_NS}/Robot")
# lights
dome_light = AssetBaseCfg(
prim_path="/World/DomeLight",
spawn=sim_utils.DomeLightCfg(color=(0.9, 0.9, 0.9), intensity=500.0),
)
일단 병렬화하고자 하는 환경의 모양새를 결정합니다. 환경 configuration환경을 만들 때는 위와 같이 InteractiveSceneCfg
를 상속받도록 해줍니다. 평면이랑 조명같은 경우에 CartPole 환경에서 물리적 상호작용을 하지 않을 뿐더러, 그렇다 하더라도 정적 요소이므로 AssetBaseCfg
로 설정합니다. CartPole의 경우 동적으로 계속 변하는 개체이기 때문에 ArticulationCfg
로 설정해 동적인 상호작용을 할 수 있도록 합니다.
Action
@configclass
class ActionsCfg:
"""Action specifications for the environment."""
joint_efforts = mdp.JointEffortActionCfg(asset_name="robot", joint_names=["slider_to_cart"], scale=5.0)
이전에는 set_joint_effort_target(efforts)
를 사용해 액션을 전달했는데, 위와 같이 클래스를 선언해두면 action_manager
를 사용해 액션을 조절할 수 있습니다. 위는 CartPole 예시이기 때문에 joint_efforts
만 있는데, 액션이 여러 개인 경우 여러 액션 변수가 있을 수 있습니다.
Observation
@configclass
class ObservationsCfg:
"""Observation specifications for the environment."""
@configclass
class PolicyCfg(ObsGroup):
"""Observations for policy group."""
# observation terms (order preserved)
joint_pos_rel = ObsTerm(func=mdp.joint_pos_rel)
joint_vel_rel = ObsTerm(func=mdp.joint_vel_rel)
def __post_init__(self) -> None:
self.enable_corruption = False
self.concatenate_terms = True
# observation groups
policy: PolicyCfg = PolicyCfg()
에이전트가 환경을 어떻게 관측할지를 결정합니다. 클래스 내부에 PolicyCfg
라는 클래스를 따로 만들어서, 행동 선택에 필요한 관측 데이터를 모아둡니다. 관측 정보로는 상대 관절 위치와, 상대 관절 속도를 저장합니다. __post_init__
함수가 선언되어 있는데, 이는 클래스 초기화 이후에 실행되는 함수입니다. enable_corruption
은 데이터에 인위적인 왜곡을 추가할 것인지를 선택하는 옵션이고, 그 아래 concatenate_terms
는 관측 정보들을 하나로 concat한 벡터로 다룰 것인지를 선택하는 옵션입니다. 마지막으로 ObservationsCfg
는 policy
라는 이름의 속성을 가지게 됩니다.
Event
@configclass
class EventCfg:
"""Configuration for events."""
# on startup강
add_pole_mass = EventTerm(
func=mdp.randomize_rigid_body_mass,
mode="startup",
params={
"asset_cfg": SceneEntityCfg("robot", body_names=["pole"]),
"mass_distribution_params": (0.1, 0.5),
"operation": "add",
},
)
# on reset
reset_cart_position = EventTerm(
func=mdp.reset_joints_by_offset,
mode="reset",
params={
"asset_cfg": SceneEntityCfg("robot", joint_names=["slider_to_cart"]),
"position_range": (-1.0, 1.0),
"velocity_range": (-0.1, 0.1),
},
)
reset_pole_position = EventTerm(
func=mdp.reset_joints_by_offset,
mode="reset",
params={
"asset_cfg": SceneEntityCfg("robot", joint_names=["cart_to_pole"]),
"position_range": (-0.125 * math.pi, 0.125 * math.pi),
"velocity_range": (-0.01 * math.pi, 0.01 * math.pi),
},
)
EventCfg
는 특정 이벤트가 발생했을 때 실행해야 하는 동작들을 포함합니다. mode
의 종류에는 "startup"
, "reset"
, "interval"
이 있고, 각각은 환경 시작 시 발생하는 이벤트, 환경 종료, 재설정 시 발생하는 이벤트, 지정된 간격으로 실행되는 이벤트를 의미합니다.
Connect all
@configclass
class CartpoleEnvCfg(ManagerBasedEnvCfg):
"""Configuration for the cartpole environment."""
# Scene settings
scene = CartpoleSceneCfg(num_envs=1024, env_spacing=2.5)
# Basic settings
observations = ObservationsCfg()
actions = ActionsCfg()
events = EventCfg()
def __post_init__(self):
"""Post initialization."""
# viewer settings
self.viewer.eye = [4.5, 0.0, 6.0]
self.viewer.lookat = [0.0, 0.0, 2.0]
# step settings
self.decimation = 4 # env step every 4 sim steps: 200Hz / 4 = 50Hz
# simulation settings
self.sim.dt = 0.005 # sim step every 5ms: 200Hz
위에서 정의한 클래스들을 모두 한데 모아 하나의 클래스로 구성합니다. 위의 decimation
은 환경 스텝 갱신 간격을 의미합니다.
main
def main():
"""Main function."""
# parse the arguments
env_cfg = CartpoleEnvCfg()
env_cfg.scene.num_envs = args_cli.num_envs
# setup base environment
env = ManagerBasedEnv(cfg=env_cfg)
# simulate physics
count = 0
while simulation_app.is_running():
with torch.inference_mode():
# reset
if count % 300 == 0:
count = 0
env.reset()
print("-" * 80)
print("[INFO]: Resetting environment...")
# sample random actions
joint_efforts = torch.randn_like(env.action_manager.action)
# step the environment
obs, _ = env.step(joint_efforts)
# print current orientation of pole
print("[Env 0]: Pole joint: ", obs["policy"][0][1].item())
# update counter
count += 1
# close the environment
env.close()
마지막으로 main()
에서 configuration 클래스 객체를 선언하고, configuration 객체를 인자로 넣어 ManagerBasedEnv
의 환경 객체를 만들어줍니다. 그리고 이어지는 코드를 통해 환경을 실행시키면 아래와 같은 화면을 볼 수 있습니다.
Direct Workflow RL Environment
Manager-Based로 만드는 방법 이외에 Direct 방식으로도 환경 스크립팅을 만들 수 있습니다. Direct 방식을 사용할 경우 전체 보상, 관찰 기능을 보다 직접적으로 구현할 수 있기 때문에 보다 구체적으로 코드를 확인할 수 있습니다.
# Copyright (c) 2022-2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import annotations
import math
import torch
from collections.abc import Sequence
from isaaclab_assets.robots.cartpole import CARTPOLE_CFG
import isaaclab.sim as sim_utils
from isaaclab.assets import Articulation, ArticulationCfg
from isaaclab.envs import DirectRLEnv, DirectRLEnvCfg
from isaaclab.scene import InteractiveSceneCfg
from isaaclab.sim import SimulationCfg
from isaaclab.sim.spawners.from_files import GroundPlaneCfg, spawn_ground_plane
from isaaclab.utils import configclass
from isaaclab.utils.math import sample_uniform
@configclass
class CartpoleEnvCfg(DirectRLEnvCfg):
# env
decimation = 2
episode_length_s = 5.0
action_scale = 100.0 # [N]
action_space = 1
observation_space = 4
state_space = 0
# simulation
sim: SimulationCfg = SimulationCfg(dt=1 / 120, render_interval=decimation)
# robot
robot_cfg: ArticulationCfg = CARTPOLE_CFG.replace(prim_path="/World/envs/env_.*/Robot")
cart_dof_name = "slider_to_cart"
pole_dof_name = "cart_to_pole"
# scene
scene: InteractiveSceneCfg = InteractiveSceneCfg(num_envs=4096, env_spacing=4.0, replicate_physics=True)
# reset
max_cart_pos = 3.0 # the cart is reset if it exceeds that position [m]
initial_pole_angle_range = [-0.25, 0.25] # the range in which the pole angle is sampled from on reset [rad]
# reward scales
rew_scale_alive = 1.0
rew_scale_terminated = -2.0
rew_scale_pole_pos = -1.0
rew_scale_cart_vel = -0.01
rew_scale_pole_vel = -0.005
class CartpoleEnv(DirectRLEnv):
cfg: CartpoleEnvCfg
def __init__(self, cfg: CartpoleEnvCfg, render_mode: str | None = None, **kwargs):
super().__init__(cfg, render_mode, **kwargs)
self._cart_dof_idx, _ = self.cartpole.find_joints(self.cfg.cart_dof_name)
self._pole_dof_idx, _ = self.cartpole.find_joints(self.cfg.pole_dof_name)
self.action_scale = self.cfg.action_scale
self.joint_pos = self.cartpole.data.joint_pos
self.joint_vel = self.cartpole.data.joint_vel
def _setup_scene(self):
self.cartpole = Articulation(self.cfg.robot_cfg)
# add ground plane
spawn_ground_plane(prim_path="/World/ground", cfg=GroundPlaneCfg())
# clone and replicate
self.scene.clone_environments(copy_from_source=False)
# add articulation to scene
self.scene.articulations["cartpole"] = self.cartpole
# add lights
light_cfg = sim_utils.DomeLightCfg(intensity=2000.0, color=(0.75, 0.75, 0.75))
light_cfg.func("/World/Light", light_cfg)
def _pre_physics_step(self, actions: torch.Tensor) -> None:
self.actions = self.action_scale * actions.clone()
def _apply_action(self) -> None:
self.cartpole.set_joint_effort_target(self.actions, joint_ids=self._cart_dof_idx)
def _get_observations(self) -> dict:
obs = torch.cat(
(
self.joint_pos[:, self._pole_dof_idx[0]].unsqueeze(dim=1),
self.joint_vel[:, self._pole_dof_idx[0]].unsqueeze(dim=1),
self.joint_pos[:, self._cart_dof_idx[0]].unsqueeze(dim=1),
self.joint_vel[:, self._cart_dof_idx[0]].unsqueeze(dim=1),
),
dim=-1,
)
observations = {"policy": obs}
return observations
def _get_rewards(self) -> torch.Tensor:
total_reward = compute_rewards(
self.cfg.rew_scale_alive,
self.cfg.rew_scale_terminated,
self.cfg.rew_scale_pole_pos,
self.cfg.rew_scale_cart_vel,
self.cfg.rew_scale_pole_vel,
self.joint_pos[:, self._pole_dof_idx[0]],
self.joint_vel[:, self._pole_dof_idx[0]],
self.joint_pos[:, self._cart_dof_idx[0]],
self.joint_vel[:, self._cart_dof_idx[0]],
self.reset_terminated,
)
return total_reward
def _get_dones(self) -> tuple[torch.Tensor, torch.Tensor]:
self.joint_pos = self.cartpole.data.joint_pos
self.joint_vel = self.cartpole.data.joint_vel
time_out = self.episode_length_buf >= self.max_episode_length - 1
out_of_bounds = torch.any(torch.abs(self.joint_pos[:, self._cart_dof_idx]) > self.cfg.max_cart_pos, dim=1)
out_of_bounds = out_of_bounds | torch.any(torch.abs(self.joint_pos[:, self._pole_dof_idx]) > math.pi / 2, dim=1)
return out_of_bounds, time_out
def _reset_idx(self, env_ids: Sequence[int] | None):
if env_ids is None:
env_ids = self.cartpole._ALL_INDICES
super()._reset_idx(env_ids)
joint_pos = self.cartpole.data.default_joint_pos[env_ids]
joint_pos[:, self._pole_dof_idx] += sample_uniform(
self.cfg.initial_pole_angle_range[0] * math.pi,
self.cfg.initial_pole_angle_range[1] * math.pi,
joint_pos[:, self._pole_dof_idx].shape,
joint_pos.device,
)
joint_vel = self.cartpole.data.default_joint_vel[env_ids]
default_root_state = self.cartpole.data.default_root_state[env_ids]
default_root_state[:, :3] += self.scene.env_origins[env_ids]
self.joint_pos[env_ids] = joint_pos
self.joint_vel[env_ids] = joint_vel
self.cartpole.write_root_pose_to_sim(default_root_state[:, :7], env_ids)
self.cartpole.write_root_velocity_to_sim(default_root_state[:, 7:], env_ids)
self.cartpole.write_joint_state_to_sim(joint_pos, joint_vel, None, env_ids)
@torch.jit.script
def compute_rewards(
rew_scale_alive: float,
rew_scale_terminated: float,
rew_scale_pole_pos: float,
rew_scale_cart_vel: float,
rew_scale_pole_vel: float,
pole_pos: torch.Tensor,
pole_vel: torch.Tensor,
cart_pos: torch.Tensor,
cart_vel: torch.Tensor,
reset_terminated: torch.Tensor,
):
rew_alive = rew_scale_alive * (1.0 - reset_terminated.float())
rew_termination = rew_scale_terminated * reset_terminated.float()
rew_pole_pos = rew_scale_pole_pos * torch.sum(torch.square(pole_pos).unsqueeze(dim=1), dim=-1)
rew_cart_vel = rew_scale_cart_vel * torch.sum(torch.abs(cart_vel).unsqueeze(dim=1), dim=-1)
rew_pole_vel = rew_scale_pole_vel * torch.sum(torch.abs(pole_vel).unsqueeze(dim=1), dim=-1)
total_reward = rew_alive + rew_termination + rew_pole_pos + rew_cart_vel + rew_pole_vel
return total_reward
위는 CartPole 환경을 Direct 방식으로 적은 코드 전체입니다. 각 부분이 어떤 역할을 갖는지 아래에서 자세하게 살펴봅니다.
CartpoleEnvCfg
@configclass
class CartpoleEnvCfg(DirectRLEnvCfg):
# env
decimation = 2
episode_length_s = 5.0
action_scale = 100.0 # [N]
action_space = 1
observation_space = 4
state_space = 0
# simulation
sim: SimulationCfg = SimulationCfg(dt=1 / 120, render_interval=decimation)
# robot
robot_cfg: ArticulationCfg = CARTPOLE_CFG.replace(prim_path="/World/envs/env_.*/Robot")
cart_dof_name = "slider_to_cart"
pole_dof_name = "cart_to_pole"
# scene
scene: InteractiveSceneCfg = InteractiveSceneCfg(num_envs=4096, env_spacing=4.0, replicate_physics=True)
# reset
max_cart_pos = 3.0 # the cart is reset if it exceeds that position [m]
initial_pole_angle_range = [-0.25, 0.25] # the range in which the pole angle is sampled from on reset [rad]
# reward scales
rew_scale_alive = 1.0
rew_scale_terminated = -2.0
rew_scale_pole_pos = -1.0
rew_scale_cart_vel = -0.01
rew_scale_pole_vel = -0.005
config 클래스를 만들 때, DirectRLEnvCfg
클래스를 상속받도록 설정해줍니다. 환경의 간단한 틀과 보상 조건들을 설정해줍니다. 위에서 부터 하나하나 어떤 의미인지 살펴보겠습니다.
# env
decimation = 2
episode_length_s = 5.0
action_scale = 100.0 # [N]
action_space = 1
observation_space = 4
state_space = 0
위와 같이 decimation과 에피소드 길이, 액션 스케일과 공간 등등의 옵션들을 설정합니다. 이어지는 세부 클래스 객체 선언 과정에서 위 변수들이 사용됩니다.
# simulation
sim: SimulationCfg = SimulationCfg(dt=1 / 120, render_interval=decimation)
decimation 설정값을 바로 시뮬레이션 configuration 클래스의 객체인 sim
을 만들 때 사용합니다.
# robot
robot_cfg: ArticulationCfg = CARTPOLE_CFG.replace(prim_path="/World/envs/env_.*/Robot")
cart_dof_name = "slider_to_cart"
pole_dof_name = "cart_to_pole"
robot configuration을 생성해주는데, CARTPOLE_CFG
가 ArticulationCfg
클래스이기 때문에 robot_cfg: ArticulationCfg
라고 표현되어 있습니다. .replace(prim_path="/World/envs/env_.*/Robot")
에서 경로 표현을 할 때 중간에.*
가 있는데, 이는 환경이 여러 개가 사용될 때 각 환경별로 다르게 경로를 설정하기 위한 표현입니다. 실제로 코드를 실행시켜보면 아래와 같이 env_0
, env_1
, … 이런 식으로 경로가 할당되는 것을 확인할 수 있습니다.
이어서 Cart와 Pole에 대한 dof 이름을 설정해주는데, 이것은 미리 설정되어있는 CartPole 로봇의 조인트 이름입니다. 로봇 설정에 맞는 이름으로 변수에 값을 할당합니다.
# scene
scene: InteractiveSceneCfg = InteractiveSceneCfg(num_envs=4096, env_spacing=4.0, replicate_physics=True)
# reset
max_cart_pos = 3.0 # the cart is reset if it exceeds that position [m]
initial_pole_angle_range = [-0.25, 0.25] # the range in which the pole angle is sampled from on reset [rad]
InteractiveSceneCfg
클래스의 객체를 만들어줍니다. 그리고 이어서 환경 리셋 조건과 초기화 범위을 결정합니다. 위 코드에서는 좌우로 3m를 초과할 경우 초기화하고, [-0.25, 0.25] 사이 범위에서 시작 Pole 각도를 결정하기 위해 각 값들을 변수에 저장합니다. 실제로 초기화를 하고, 시작 각도를 샘플링해 정해주는 부분은 코드의 한참 아래 부분에서 확인할 수 있습니다.
# reward scales
rew_scale_alive = 1.0
rew_scale_terminated = -2.0
rew_scale_pole_pos = -1.0
rew_scale_cart_vel = -0.01
rew_scale_pole_vel = -0.005
마지막으로 환경 보상을 결정합니다. 매 스텝마다의 생존 보상, 실패시 받게되는 음수 보상 등등 Cart가 막대 중심잡기 태스크를 잘 학습할 수 있도록 적절한 값으로 결정합니다.
CartpoleEnv
class CartpoleEnv(DirectRLEnv):
cfg: CartpoleEnvCfg
def __init__(self, cfg: CartpoleEnvCfg, render_mode: str | None = None, **kwargs):
super().__init__(cfg, render_mode, **kwargs)
self._cart_dof_idx, _ = self.cartpole.find_joints(self.cfg.cart_dof_name)
self._pole_dof_idx, _ = self.cartpole.find_joints(self.cfg.pole_dof_name)
self.action_scale = self.cfg.action_scale
self.joint_pos = self.cartpole.data.joint_pos
self.joint_vel = self.cartpole.data.joint_vel
def _setup_scene(self):
self.cartpole = Articulation(self.cfg.robot_cfg)
# add ground plane
spawn_ground_plane(prim_path="/World/ground", cfg=GroundPlaneCfg())
# clone and replicate
self.scene.clone_environments(copy_from_source=False)
# add articulation to scene
self.scene.articulations["cartpole"] = self.cartpole
# add lights
light_cfg = sim_utils.DomeLightCfg(intensity=2000.0, color=(0.75, 0.75, 0.75))
light_cfg.func("/World/Light", light_cfg)
def _pre_physics_step(self, actions: torch.Tensor) -> None:
self.actions = self.action_scale * actions.clone()
def _apply_action(self) -> None:
self.cartpole.set_joint_effort_target(self.actions, joint_ids=self._cart_dof_idx)
def _get_observations(self) -> dict:
obs = torch.cat(
(
self.joint_pos[:, self._pole_dof_idx[0]].unsqueeze(dim=1),
self.joint_vel[:, self._pole_dof_idx[0]].unsqueeze(dim=1),
self.joint_pos[:, self._cart_dof_idx[0]].unsqueeze(dim=1),
self.joint_vel[:, self._cart_dof_idx[0]].unsqueeze(dim=1),
),
dim=-1,
)
observations = {"policy": obs}
return observations
def _get_rewards(self) -> torch.Tensor:
total_reward = compute_rewards(
self.cfg.rew_scale_alive,
self.cfg.rew_scale_terminated,
self.cfg.rew_scale_pole_pos,
self.cfg.rew_scale_cart_vel,
self.cfg.rew_scale_pole_vel,
self.joint_pos[:, self._pole_dof_idx[0]],
self.joint_vel[:, self._pole_dof_idx[0]],
self.joint_pos[:, self._cart_dof_idx[0]],
self.joint_vel[:, self._cart_dof_idx[0]],
self.reset_terminated,
)
return total_reward
def _get_dones(self) -> tuple[torch.Tensor, torch.Tensor]:
self.joint_pos = self.cartpole.data.joint_pos
self.joint_vel = self.cartpole.data.joint_vel
time_out = self.episode_length_buf >= self.max_episode_length - 1
out_of_bounds = torch.any(torch.abs(self.joint_pos[:, self._cart_dof_idx]) > self.cfg.max_cart_pos, dim=1)
out_of_bounds = out_of_bounds | torch.any(torch.abs(self.joint_pos[:, self._pole_dof_idx]) > math.pi / 2, dim=1)
return out_of_bounds, time_out
def _reset_idx(self, env_ids: Sequence[int] | None):
if env_ids is None:
env_ids = self.cartpole._ALL_INDICES
super()._reset_idx(env_ids)
joint_pos = self.cartpole.data.default_joint_pos[env_ids]
joint_pos[:, self._pole_dof_idx] += sample_uniform(
self.cfg.initial_pole_angle_range[0] * math.pi,
self.cfg.initial_pole_angle_range[1] * math.pi,
joint_pos[:, self._pole_dof_idx].shape,
joint_pos.device,
)
joint_vel = self.cartpole.data.default_joint_vel[env_ids]
default_root_state = self.cartpole.data.default_root_state[env_ids]
default_root_state[:, :3] += self.scene.env_origins[env_ids]
self.joint_pos[env_ids] = joint_pos
self.joint_vel[env_ids] = joint_vel
self.cartpole.write_root_pose_to_sim(default_root_state[:, :7], env_ids)
self.cartpole.write_root_velocity_to_sim(default_root_state[:, 7:], env_ids)
self.cartpole.write_joint_state_to_sim(joint_pos, joint_vel, None, env_ids)
앞전에 다룬 CartpoleEnvCfg
는 환경의 틀을 설정해두는 configuration 클래스였고, 위의 CartpoleEnv
에서 환경의 보다 구체적인 설정을 다룹니다.
class CartpoleEnv(DirectRLEnv):
cfg: CartpoleEnvCfg
일단 먼저 클래스를 선언할 때, DirectRLEnv
를 상속받게 설정해야 하고, 이 클래스가 받는 설정이 CartpoleEnvCfg
타입이어야 함을 명시해둡니다.
def __init__(self, cfg: CartpoleEnvCfg, render_mode: str | None = None, **kwargs):
super().__init__(cfg, render_mode, **kwargs)
self._cart_dof_idx, _ = self.cartpole.find_joints(self.cfg.cart_dof_name)
self._pole_dof_idx, _ = self.cartpole.find_joints(self.cfg.pole_dof_name)
self.action_scale = self.cfg.action_scale
self.joint_pos = self.cartpole.data.joint_pos
self.joint_vel = self.cartpole.data.joint_vel
def _setup_scene(self):
self.cartpole = Articulation(self.cfg.robot_cfg)
# add ground plane
spawn_ground_plane(prim_path="/World/ground", cfg=GroundPlaneCfg())
# clone and replicate
self.scene.clone_environments(copy_from_source=False)
# add articulation to scene
self.scene.articulations["cartpole"] = self.cartpole
# add lights
light_cfg = sim_utils.DomeLightCfg(intensity=2000.0, color=(0.75, 0.75, 0.75))
light_cfg.func("/World/Light", light_cfg)
super().__init__(cfg, render_mode, **kwargs)
를 통해 초기화를 할 때에 DirectRLEnv
의 초기화 과정을 동일하게 수행합니다. 해당 초기화 과정에서 _setup_scene
를 실행하는 부분이 있고, _setup_scene
가 CartpoleEnv
에서 동일한 이름으로 재정의되기 때문에 CartpoleEnv
에서 정의한 _setup_scene
가 실행됩니다. 저 한 줄 안에 굉장히 많은 내용들이 숨어있는데, cfg
를 받아 self.cfg
에 저장하는 내용도 있고, 또 그 self.cfg
를 사용해 self.scene
를 선언하는 부분 등등이 내부적으로 수행됩니다.
self.cartpole
는 Articulation
클래스의 객체인데요, 관절의 이름을 넣어주면, 해당 조인트의 인덱스를 반환해주는 .find_joints
메서드가 있습니다. 이거를 사용해서 self._cart_dof_idx
와 self._cart_dof_idx
에 인덱스를 저장해줍니다. 그리고 조인트의 위치와 속도에 대한 정보도 따로 클래스 변수를 선언해 저장해둡니다.
_setup_scene
에서는 CartPole 로봇을 만들고 환경 복제하는 코드, 그리고 지면과 조명 설정 등등 장면을 어떻게 구성할지를 결정합니다.
def _pre_physics_step(self, actions: torch.Tensor) -> None:
self.actions = self.action_scale * actions.clone()
def _apply_action(self) -> None:
self.cartpole.set_joint_effort_target(self.actions, joint_ids=self._cart_dof_idx)
def _get_observations(self) -> dict:
obs = torch.cat(
(
self.joint_pos[:, self._pole_dof_idx[0]].unsqueeze(dim=1),
self.joint_vel[:, self._pole_dof_idx[0]].unsqueeze(dim=1),
self.joint_pos[:, self._cart_dof_idx[0]].unsqueeze(dim=1),
self.joint_vel[:, self._cart_dof_idx[0]].unsqueeze(dim=1),
),
dim=-1,
)
observations = {"policy": obs}
return observations
_pre_physics_step
은 RL 스텝이 실행될 때마다 액션에다가 전처리를 해주는 함수입니다. 여기에서는 decimation이 2로 설정되어 있으니까 물리 단계가 2번 수행될 동안 1번 실행되고, self.action_scale
가 100으로 설정되어 있어서 기존 행동에 100을 곱해주고 self.actions
에 저장합니다.
_apply_action
은 물리 단계마다 매번 호출되는 함수로 _pre_physics_step
에서 설정한 액션대로 CartPole의 조인트 토크값을 부여합니다. Cart의 이동만 결정할 수 있기 때문에 joint_ids
로 self._cart_dof_idx
만 들어갑니다.
_get_observations
은 에이전트에게 제공할 관측을 설정하는 함수입니다. 예를 들어, 첫 번째 줄에서 self.joint_pos[:, self._pole_dof_idx[0]].unsqueeze(dim=1)
로 되어 있는데, 모든 환경들의 관측 정보를 다 가져와야해서 앞에 :
라고 적어둡니다. 그리고 뒤에는 다 0번째 인덱스를 가져오도록 되어있는데, 이는 self._pole_dof_idx
가 처음에 정의될 때 원소가 1개 들어있는 리스트 형식으로 만들어지기 때문에, 안에 있는 원소 값인 인덱스만을 가져오기 위해서 [0]
가 붙어 사용됩니다. observations = {"policy": obs}
를 사용해 딕셔너리로 만든 다음에 딕셔너리를 함수의 반환값으로 정해두었는데, 여기에서는 사용되지 않았지만 critic
을 위한 관측정보를 따로 분류하는 경우가 있습니다. 그런 경우에는 딕셔너리가 두 개의 원소를 가지는 형태로 반환됩니다.
def _get_rewards(self) -> torch.Tensor:
total_reward = compute_rewards(
self.cfg.rew_scale_alive,
self.cfg.rew_scale_terminated,
self.cfg.rew_scale_pole_pos,
self.cfg.rew_scale_cart_vel,
self.cfg.rew_scale_pole_vel,
self.joint_pos[:, self._pole_dof_idx[0]],
self.joint_vel[:, self._pole_dof_idx[0]],
self.joint_pos[:, self._cart_dof_idx[0]],
self.joint_vel[:, self._cart_dof_idx[0]],
self.reset_terminated,
)
return total_reward
_get_rewards
는 보상을 계산해주는 함수입니다. 동일한 파이썬 코드 파일 하단에 정의해둔 compute_rewards
함수를 사용해 여러 상태정보들을 사용해 최종적인 보상을 계산하고 각 환경마다 계산된 보상의 텐서를 반환합니다.
def _get_dones(self) -> tuple[torch.Tensor, torch.Tensor]:
self.joint_pos = self.cartpole.data.joint_pos
self.joint_vel = self.cartpole.data.joint_vel
time_out = self.episode_length_buf >= self.max_episode_length - 1
out_of_bounds = torch.any(torch.abs(self.joint_pos[:, self._cart_dof_idx]) > self.cfg.max_cart_pos, dim=1)
out_of_bounds = out_of_bounds | torch.any(torch.abs(self.joint_pos[:, self._pole_dof_idx]) > math.pi / 2, dim=1)
return out_of_bounds, time_out
매 스텝마다 에피소드의 종료 조건을 판정합니다. 이동 범위를 벗어나거나 막대가 쓰러진 경우는 out_of_bounds
에 인덱스를 저장하고, 시간이 초과해 종료되는 경우는 time_out
에 인덱스를 저장해 반환합니다.
def _reset_idx(self, env_ids: Sequence[int] | None):
if env_ids is None:
env_ids = self.cartpole._ALL_INDICES
super()._reset_idx(env_ids)
joint_pos = self.cartpole.data.default_joint_pos[env_ids]
joint_pos[:, self._pole_dof_idx] += sample_uniform(
self.cfg.initial_pole_angle_range[0] * math.pi,
self.cfg.initial_pole_angle_range[1] * math.pi,
joint_pos[:, self._pole_dof_idx].shape,
joint_pos.device,
)
joint_vel = self.cartpole.data.default_joint_vel[env_ids]
default_root_state = self.cartpole.data.default_root_state[env_ids]
default_root_state[:, :3] += self.scene.env_origins[env_ids]
self.joint_pos[env_ids] = joint_pos
self.joint_vel[env_ids] = joint_vel
self.cartpole.write_root_pose_to_sim(default_root_state[:, :7], env_ids)
self.cartpole.write_root_velocity_to_sim(default_root_state[:, 7:], env_ids)
self.cartpole.write_joint_state_to_sim(joint_pos, joint_vel, None, env_ids)
에피소드 종류 후 환경들을 재설정하는 함수입니다. 막대기의 기울기에만 조금의 랜덤성을 부여하고 나머지는 설정된 default 값으로 재설정합니다.
./isaaclab.sh -p scripts/reinforcement_learning/rl_games/train.py --task=Isaac-Cartpole-Direct-v0
위에서 정의된 환경 파일을 사용해 코드를 실행시키면 여러 병렬 CartPole 환경을 만들어 강화학습이 수행되는 것을 확인할 수 있습니다(환경을 등록하고 학습을 수행하는 내용은 아래에서 다룹니다.).
환경 등록
./isaaclab.sh -p scripts/environments/random_agent.py --task Isaac-Cartpole-v0 --num_envs 32
위에서 Direct 방식으로 환경을 만들고 실행하는 코드는 위와 같습니다. --task=Isaac-Cartpole-Direct-v0
를 인자로 넣어줘서 우리가 위에서 만들었던 환경을 불러와 학습을 수행하게 됩니다. train.py
내부를 보면, 실행시 인자로 받은 태스크에 맞는 환경을 만들고 학습을 수행하도록 코드가 짜여져 있습니다. 위와 같이 실행시키기 위해서는 우리가 만든 환경 파일을 Isaac-Cartpole-Direct-v0
라는 이름으로 등록해야 합니다.
이전에 CartPole을 만들었던 디렉터리의 구조입니다. 동일한 디렉터리에 __init__.py
초기화 파일이 있는 것을 확인할 수 있습니다. 바로 저기에서 환경 등록 절차를 수행하게 됩니다.
import gymnasium as gym
from . import agents
##
# Register Gym environments.
##
gym.register(
id="Isaac-Cartpole-Direct-v0",
entry_point=f"{__name__}.cartpole_env:CartpoleEnv",
disable_env_checker=True,
kwargs={
"env_cfg_entry_point": f"{__name__}.cartpole_env:CartpoleEnvCfg",
"rl_games_cfg_entry_point": f"{agents.__name__}:rl_games_ppo_cfg.yaml",
"rsl_rl_cfg_entry_point": f"{agents.__name__}.rsl_rl_ppo_cfg:CartpolePPORunnerCfg",
"skrl_cfg_entry_point": f"{agents.__name__}:skrl_ppo_cfg.yaml",
"sb3_cfg_entry_point": f"{agents.__name__}:sb3_ppo_cfg.yaml",
},
)
gym.register
라는 함수를 사용해 환경을 글로벌 레지스트리에 저장합니다.
id
: 환경 등록 시 설정하는 고유 ID를 설정합니다.entry_point
:__name__
을 사용해 환경 클래스가 정의된 파이썬 파일의 주소를 표현하고, 그 뒤에 해당 파일에서 가져올 클래스 이름을 적습니다.disable_env_checker
: 환경이 잘 만들어졌는지 확인하는 기능입니다.kwargs
: 환경을 생성할 때, 필요한 여러 인자들을 딕셔너리 형태로 제공합니다.
# Copyright (c) 2022-2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Script to an environment with random action agent."""
"""Launch Isaac Sim Simulator first."""
import argparse
from isaaclab.app import AppLauncher
# add argparse arguments
parser = argparse.ArgumentParser(description="Random agent for Isaac Lab environments.")
parser.add_argument(
"--disable_fabric", action="store_true", default=False, help="Disable fabric and use USD I/O operations."
)
parser.add_argument("--num_envs", type=int, default=None, help="Number of environments to simulate.")
parser.add_argument("--task", type=str, default=None, help="Name of the task.")
# append AppLauncher cli args
AppLauncher.add_app_launcher_args(parser)
# parse the arguments
args_cli = parser.parse_args()
# launch omniverse app
app_launcher = AppLauncher(args_cli)
simulation_app = app_launcher.app
"""Rest everything follows."""
import gymnasium as gym
import torch
import isaaclab_tasks # noqa: F401
from isaaclab_tasks.utils import parse_env_cfg
def main():
"""Random actions agent with Isaac Lab environment."""
# create environment configuration
env_cfg = parse_env_cfg(
args_cli.task, device=args_cli.device, num_envs=args_cli.num_envs, use_fabric=not args_cli.disable_fabric
)
# create environment
env = gym.make(args_cli.task, cfg=env_cfg)
# print info (this is vectorized environment)
print(f"[INFO]: Gym observation space: {env.observation_space}")
print(f"[INFO]: Gym action space: {env.action_space}")
# reset environment
env.reset()
# simulate environment
while simulation_app.is_running():
# run everything in inference mode
with torch.inference_mode():
# sample actions from -1 to 1
actions = 2 * torch.rand(env.action_space.shape, device=env.unwrapped.device) - 1
# apply actions
env.step(actions)
# close the simulator
env.close()
if __name__ == "__main__":
# run the main function
main()
# close sim app
simulation_app.close()
random_agent.py
입니다. 위 코드를 실행시킬 때 인자중 하나로 --task Isaac-Cartpole-v0
를 주는데, 환경 등록을 해주었기 때문에 우리가 만들었던 환경이 그대로 실행됩니다.
댓글남기기