Browse Source

增加guguji_rl强化学习训练相关代码,并增加了docs目录用于介绍整个强化学习的步骤及注意事项

corvin_zhang 1 week ago
parent
commit
019c6618f6

+ 5 - 0
.gitignore

@@ -9,3 +9,8 @@
 # Python 运行缓存
 # Python 运行缓存
 __pycache__/
 __pycache__/
 *.pyc
 *.pyc
+
+# 强化学习训练工程缓存与输出
+/guguji_rl/.venv/
+/guguji_rl/outputs/
+/guguji_rl/.pytest_cache/

+ 16 - 0
README.md

@@ -64,6 +64,12 @@ ros2 launch guguji_ros2 gazebo.launch.py
 ros2 launch guguji_ros2 gazebo.launch.py gui:=false
 ros2 launch guguji_ros2 gazebo.launch.py gui:=false
 ```
 ```
 
 
+如果你准备做强化学习训练,建议让 Gazebo 以暂停模式启动:
+
+```bash
+ros2 launch guguji_ros2 gazebo.launch.py gui:=false pause:=true
+```
+
 ## Fortress 调试接口
 ## Fortress 调试接口
 
 
 `gazebo.launch.py` 现在已经接通了下面几类接口:
 `gazebo.launch.py` 现在已经接通了下面几类接口:
@@ -72,6 +78,16 @@ ros2 launch guguji_ros2 gazebo.launch.py gui:=false
 - `/joint_states`:Gazebo 真实关节状态
 - `/joint_states`:Gazebo 真实关节状态
 - `/tf`:Gazebo 真实模型/连杆位姿
 - `/tf`:Gazebo 真实模型/连杆位姿
 - `/guguji/command/<joint_name>`:每个关节的位置控制命令,消息类型为 `std_msgs/msg/Float64`
 - `/guguji/command/<joint_name>`:每个关节的位置控制命令,消息类型为 `std_msgs/msg/Float64`
+- `/world/default/control`:Gazebo 世界控制服务,可用于 reset / pause / step
+
+## 强化学习工程
+
+仓库根目录下新增了 `guguji_rl/`,用于放置强化学习训练和策略运行代码。这样做是为了把:
+
+- ROS 2 / Gazebo 仿真代码
+- Python 强化学习依赖与训练输出
+
+分开管理,后续会更容易调试。
 
 
 ## 控制示例
 ## 控制示例
 
 

+ 268 - 0
docs/guguji_biped_rl_guide.md

@@ -0,0 +1,268 @@
+# guguji 双足机器人强化学习训练指南
+
+这份文档面向“第一次做双足机器人强化学习”的场景,目标是帮助你把当前已经能在 ROS 2 Humble + Gazebo Fortress 中运行的机器人,逐步推进到“可以用强化学习训练行走策略,并在 ROS 2 系统中在线运行策略”。
+
+## 1. 总体目标
+
+你的完整目标可以拆成四层:
+
+1. 仿真层:
+   机器人能在 Gazebo Fortress 中稳定启动,并提供关节命令、关节状态、TF、世界 reset 接口。
+2. 环境层:
+   把 Gazebo + ROS 2 封装成 Gymnasium 环境,形成 `reset()` / `step(action)` / `reward` / `done` 这套强化学习接口。
+3. 训练层:
+   用 PPO 等算法训练策略,让机器人先学会站稳,再学会前进。
+4. 部署层:
+   训练好的策略作为一个在线推理程序,在 ROS 2 系统里读取状态并发布关节命令。
+
+## 2. 目录应该怎么放
+
+我建议采用你现在仓库里的这种分层方式:
+
+- `guguji_ros2_ws/src/guguji_ros2`
+  负责机器人模型、Gazebo 启动、ROS 2 话题和仿真接口。
+- `guguji_rl`
+  负责强化学习训练代码、算法依赖、配置文件、训练输出。
+
+这样做的好处:
+
+- ROS 2 包保持干净,不会混进大量训练依赖。
+- 强化学习代码可以单独建虚拟环境。
+- 你以后调试时可以快速区分“仿真问题”和“算法问题”。
+
+## 3. 当前已经帮你准备好的能力
+
+当前代码已经具备下面这些训练所需的基础接口:
+
+- `/joint_states`
+  Gazebo 真实关节状态
+- `/tf`
+  Gazebo 真实模型 / 连杆位姿
+- `/clock`
+  仿真时钟
+- `/guguji/command/<joint_name>`
+  每个关节的位置命令接口
+- `/world/default/control`
+  Gazebo 世界控制服务,可用于 reset / pause / step
+
+这些接口已经足够构成一个最小的强化学习训练环境。
+
+## 4. 训练流程建议
+
+对于双足机器人,不建议一上来就直接训练“向前走”。更稳妥的路线是课程式训练。
+
+### 阶段 A:接口验证
+
+目标:
+
+- 确认环境能 reset
+- 确认发关节命令后 `/joint_states` 会变化
+- 确认 `/tf` 可以读到机器人底座位姿
+
+你可以先运行:
+
+```bash
+source /opt/ros/humble/setup.bash
+cd /home/corvin/Project/guguji_simulation/guguji_ros2_ws
+source install/setup.bash
+ros2 launch guguji_ros2 gazebo.launch.py gui:=false
+```
+
+另开一个终端执行:
+
+```bash
+cd /home/corvin/Project/guguji_simulation/guguji_rl
+source .venv/bin/activate
+python3 scripts/check_env.py --config configs/balance_ppo.yaml
+```
+
+如果你准备正式训练,建议把 Gazebo 改成下面这种启动方式:
+
+```bash
+ros2 launch guguji_ros2 gazebo.launch.py gui:=false pause:=true
+```
+
+这样训练程序可以通过世界控制服务精确推进仿真步数,训练会更稳定。
+
+### 阶段 B:站立平衡
+
+目标:
+
+- 让机器人学会不摔倒
+- 把姿态稳定在较小的 `roll / pitch`
+- 尽量保持基座高度稳定
+
+建议先用:
+
+- `configs/balance_ppo.yaml`
+- `task.target_forward_velocity = 0.0`
+
+这个阶段的关键不是“走”,而是“活着且稳定”。
+
+### 阶段 C:原地摆腿 / 重心转移
+
+目标:
+
+- 在不摔倒的前提下学会左右腿配合
+- 逐渐形成可重复的步态雏形
+
+这个阶段可以继续沿用 balance 配置,但逐步:
+
+- 增大关节可动范围
+- 减小对动作变化的惩罚
+- 允许更积极的姿态变化
+
+### 阶段 D:前进训练
+
+目标:
+
+- 跟踪目标前进速度
+- 尽量减少横向漂移
+- 保持基座稳定
+
+建议从较低目标速度开始,例如:
+
+- `0.10 m/s`
+- `0.15 m/s`
+- `0.25 m/s`
+
+不要一开始就追求很快,否则双足很容易学成“扑倒式前冲”。
+
+## 5. 奖励函数应该怎么理解
+
+当前代码中已经实现了一套适合入门调试的奖励结构:
+
+- `alive_bonus`
+  只要没倒就给基础奖励
+- `velocity_tracking`
+  鼓励前进速度接近目标值
+- `upright`
+  鼓励机器人保持正立
+- `height`
+  鼓励基座高度不要塌陷
+- `action_rate_penalty`
+  惩罚动作变化过快
+- `joint_limit_penalty`
+  惩罚关节长期打到极限
+- `lateral_velocity_penalty`
+  惩罚横向乱漂
+- `fall_penalty`
+  一旦倒下,给予额外负奖励
+
+对于第一次做双足 RL,建议你先只调这几类奖励,不要一下把奖励做得太复杂。
+
+## 6. 训练算法为什么先选 PPO
+
+我给你准备的是 PPO 路线,原因是:
+
+- 对连续动作控制比较常见
+- 文档和教程多
+- 在单机器人、低并发、Gazebo 这类较慢环境里更容易先跑通
+
+但你也要注意:Gazebo 单实例训练速度会比较慢,远慢于 Isaac Gym / Mujoco 这类高并发模拟器。
+
+所以更现实的预期是:
+
+- 先用 Gazebo 跑通“训练管线”
+- 先学会平衡和低速前进
+- 真正想追求高效率训练时,再考虑并行环境或更快的模拟器
+
+## 7. CPU 和 GPU 怎么选
+
+你的机器上已经有 CUDA 驱动,但显卡显存只有 2GB 左右,因此建议这样使用:
+
+- 初期调试:
+  用 `training.device: cpu`
+- 小网络正式训练:
+  用 `training.device: auto` 或 `cuda`
+- 如果显存不够:
+  降低 `policy_net_arch`、`batch_size`
+
+当前配置文件已经支持:
+
+- `cpu`
+- `cuda`
+- `auto`
+
+不需要改训练代码,只要改 YAML 或命令行参数。
+
+## 8. 已准备好的代码结构
+
+### 仿真接口
+
+- `guguji_ros2_ws/src/guguji_ros2/launch/gazebo.launch.py`
+  Gazebo Fortress 启动与桥接
+- `guguji_ros2_ws/src/guguji_ros2/urdf/guguji.urdf`
+  Gazebo 插件、关节控制器、关节状态和位姿发布器
+
+### 强化学习工程
+
+- `guguji_rl/guguji_rl/ros2_interface.py`
+  ROS 2 / Gazebo 接口封装
+- `guguji_rl/guguji_rl/envs/gazebo_biped_env.py`
+  Gymnasium 环境
+- `guguji_rl/guguji_rl/rewards.py`
+  奖励函数
+- `guguji_rl/scripts/train.py`
+  PPO 训练
+- `guguji_rl/scripts/evaluate.py`
+  加载策略并运行
+- `guguji_rl/scripts/run_policy.py`
+  把训练好的策略作为在线控制程序持续运行
+- `guguji_rl/scripts/check_env.py`
+  环境检查
+
+## 9. 推荐的第一轮实践步骤
+
+1. 启动 Gazebo 仿真,确认机器人正常出现。
+2. 运行 `check_env.py`,确认 reset / step / reward 能跑通。
+3. 训练 `balance_ppo.yaml`,不要急着训练 walking。
+4. 观察 `/joint_states` 和 `/tf`,检查机器人是否会频繁倒地。
+5. 调整奖励和关节范围后,再切到 `walk_ppo.yaml`。
+
+## 10. 训练完成后怎么在 ROS 2 里持续控制
+
+训练完成后,建议分成两步:
+
+1. 用 `evaluate.py` 做短时回放,确认模型文件没有问题。
+2. 用 `run_policy.py` 持续运行策略,让它在当前 Gazebo / ROS 2 系统里不断读取状态并发布关节命令。
+
+示例命令:
+
+```bash
+cd /home/corvin/Project/guguji_simulation/guguji_rl
+source .venv/bin/activate
+python3 scripts/run_policy.py \
+  --config configs/walk_ppo.yaml \
+  --model outputs/<你的实验目录>/final_model.zip \
+  --deterministic
+```
+
+## 11. 后续你最可能会继续改的地方
+
+你后面大概率会集中修改这些点:
+
+- `guguji_rl/guguji_rl/rewards.py`
+  调奖励项和权重
+- `guguji_rl/guguji_rl/envs/gazebo_biped_env.py`
+  调 observation、done 条件、动作映射
+- `guguji_rl/configs/*.yaml`
+  调训练超参数、设备、目标速度
+- `guguji_ros2_ws/src/guguji_ros2/urdf/guguji.urdf`
+  调关节范围、PD 增益、Gazebo 控制器参数
+
+## 12. 现实建议
+
+对双足机器人来说,第一次做强化学习最难的通常不是“算法”,而是下面三件事:
+
+- 动作空间是否合理
+- 奖励函数是否稳定
+- reset 是否干净且可重复
+
+所以你的第一目标不要设成“尽快走得很像人”,而是:
+
+- 先站住
+- 再学会规律摆腿
+- 最后再追求前进速度和稳定性
+
+这条路线更稳,也更适合第一次上手。

+ 68 - 0
guguji_rl/README.md

@@ -0,0 +1,68 @@
+# guguji_rl
+
+`guguji_rl` 是 `guguji_simulation` 仓库里的强化学习训练工程。
+
+## 为什么单独放在仓库根目录
+
+我把训练代码放在 `guguji_ros2_ws` 外面,原因是:
+
+- `guguji_ros2_ws/src/guguji_ros2` 继续专注于机器人描述、Gazebo 启动和 ROS 2 接口
+- `guguji_rl` 专注于 Python 强化学习依赖,例如 `stable-baselines3`、`gymnasium`、`torch`
+- 这样后续你更容易分别调试“机器人仿真问题”和“训练算法问题”
+
+## 当前实现内容
+
+- `configs/`:训练配置,支持 CPU / GPU 切换
+- `guguji_rl/ros2_interface.py`:ROS 2 与 Gazebo 的训练接口
+- `guguji_rl/envs/gazebo_biped_env.py`:Gymnasium 环境封装
+- `guguji_rl/rewards.py`:奖励函数
+- `scripts/train.py`:PPO 训练入口
+- `scripts/evaluate.py`:加载训练好的策略并在 Gazebo 中运行
+- `scripts/run_policy.py`:把训练好的策略作为在线控制程序持续运行
+- `scripts/check_env.py`:训练前自检脚本
+
+## 建议工作流
+
+1. 先启动 Gazebo 仿真
+2. 再运行 `check_env.py` 检查 ROS 2 / Gazebo 训练接口
+3. 先跑 `balance_ppo.yaml` 训练站立平衡
+4. 再跑 `walk_ppo.yaml` 训练前进
+5. 训练出模型后,先用 `evaluate.py` 做回放,再用 `run_policy.py` 在 ROS 2 系统中持续推理控制
+
+## 安装依赖
+
+建议在独立虚拟环境中安装:
+
+```bash
+cd /home/corvin/Project/guguji_simulation/guguji_rl
+python3 -m venv .venv
+source .venv/bin/activate
+pip install -U pip
+pip install -r requirements.txt
+```
+
+如果要用 GPU,请先确认 `torch.cuda.is_available()` 为 `True`,然后在配置文件里把 `training.device` 设为 `cuda` 或 `auto`。
+
+## 训练时推荐的 Gazebo 启动方式
+
+如果你准备正式开始训练,建议这样启动 Gazebo:
+
+```bash
+source /opt/ros/humble/setup.bash
+cd /home/corvin/Project/guguji_simulation/guguji_ros2_ws
+source install/setup.bash
+ros2 launch guguji_ros2 gazebo.launch.py gui:=false pause:=true
+```
+
+这样训练环境可以通过 `/world/default/control` 精确推进仿真步数,更适合强化学习。
+
+## 在线运行训练好的策略
+
+```bash
+cd /home/corvin/Project/guguji_simulation/guguji_rl
+source .venv/bin/activate
+python3 scripts/run_policy.py \
+  --config configs/walk_ppo.yaml \
+  --model outputs/<你的实验目录>/final_model.zip \
+  --deterministic
+```

+ 74 - 0
guguji_rl/configs/balance_ppo.yaml

@@ -0,0 +1,74 @@
+experiment:
+  name: balance_ppo
+
+robot:
+  model_name: guguji
+  urdf_path: guguji_ros2_ws/src/guguji_ros2/urdf/guguji.urdf
+  joint_names:
+    - left_hip_pitch_joint
+    - left_knee_pitch_joint
+    - left_ankle_pitch_joint
+    - left_ankle_joint
+    - right_hip_pitch_joint
+    - right_knee_pitch_joint
+    - right_ankle_pitch_joint
+    - right_ankle_joint
+  command_topic_prefix: /guguji/command
+
+ros:
+  joint_state_topic: /joint_states
+  tf_topic: /tf
+  clock_topic: /clock
+  world_control_service: /world/default/control
+
+sim:
+  world_name: default
+  # 强化学习训练更适合用 service_step,能让每一步更可控、更容易复现。
+  step_mode: service_step
+  control_dt: 0.05
+  service_step_iterations: 50
+  reset_settle_seconds: 1.0
+  action_publish_delay: 0.01
+  post_step_wait_seconds: 0.01
+  # 训练时建议 Gazebo 以 pause 模式启动,再由训练程序按步推进仿真。
+  launch_hint: ros2 launch guguji_ros2 gazebo.launch.py gui:=false pause:=true
+
+task:
+  target_forward_velocity: 0.0
+  target_base_height: null
+  max_roll_rad: 0.60
+  max_pitch_rad: 0.60
+  min_base_height: 0.14
+
+rewards:
+  alive_bonus: 1.0
+  velocity_tracking_scale: 1.5
+  velocity_tracking_sigma: 0.30
+  upright_scale: 2.0
+  height_scale: 1.0
+  action_rate_penalty_scale: 0.03
+  joint_limit_penalty_scale: 0.05
+  lateral_velocity_penalty_scale: 0.10
+  fall_penalty: -15.0
+
+training:
+  algorithm: ppo
+  total_timesteps: 200000
+  max_episode_steps: 400
+  seed: 42
+  device: auto
+  learning_rate: 0.0003
+  n_steps: 1024
+  batch_size: 256
+  gamma: 0.99
+  gae_lambda: 0.95
+  clip_range: 0.2
+  ent_coef: 0.0
+  vf_coef: 0.5
+  policy_net_arch: [256, 256]
+  checkpoint_freq: 20000
+  output_root: guguji_rl/outputs
+
+evaluation:
+  episodes: 3
+  deterministic: true

+ 74 - 0
guguji_rl/configs/walk_ppo.yaml

@@ -0,0 +1,74 @@
+experiment:
+  name: walk_ppo
+
+robot:
+  model_name: guguji
+  urdf_path: guguji_ros2_ws/src/guguji_ros2/urdf/guguji.urdf
+  joint_names:
+    - left_hip_pitch_joint
+    - left_knee_pitch_joint
+    - left_ankle_pitch_joint
+    - left_ankle_joint
+    - right_hip_pitch_joint
+    - right_knee_pitch_joint
+    - right_ankle_pitch_joint
+    - right_ankle_joint
+  command_topic_prefix: /guguji/command
+
+ros:
+  joint_state_topic: /joint_states
+  tf_topic: /tf
+  clock_topic: /clock
+  world_control_service: /world/default/control
+
+sim:
+  world_name: default
+  # 走路训练也建议采用 service_step,便于稳定做 step / reward 对齐。
+  step_mode: service_step
+  control_dt: 0.05
+  service_step_iterations: 50
+  reset_settle_seconds: 1.2
+  action_publish_delay: 0.01
+  post_step_wait_seconds: 0.01
+  # 训练时建议 Gazebo 以 pause 模式启动,再由训练程序按步推进仿真。
+  launch_hint: ros2 launch guguji_ros2 gazebo.launch.py gui:=false pause:=true
+
+task:
+  target_forward_velocity: 0.25
+  target_base_height: null
+  max_roll_rad: 0.65
+  max_pitch_rad: 0.65
+  min_base_height: 0.12
+
+rewards:
+  alive_bonus: 1.0
+  velocity_tracking_scale: 3.0
+  velocity_tracking_sigma: 0.25
+  upright_scale: 1.5
+  height_scale: 0.8
+  action_rate_penalty_scale: 0.04
+  joint_limit_penalty_scale: 0.05
+  lateral_velocity_penalty_scale: 0.15
+  fall_penalty: -20.0
+
+training:
+  algorithm: ppo
+  total_timesteps: 500000
+  max_episode_steps: 500
+  seed: 42
+  device: auto
+  learning_rate: 0.0003
+  n_steps: 1024
+  batch_size: 256
+  gamma: 0.99
+  gae_lambda: 0.95
+  clip_range: 0.2
+  ent_coef: 0.0
+  vf_coef: 0.5
+  policy_net_arch: [256, 256]
+  checkpoint_freq: 50000
+  output_root: guguji_rl/outputs
+
+evaluation:
+  episodes: 3
+  deterministic: true

+ 1 - 0
guguji_rl/guguji_rl/__init__.py

@@ -0,0 +1 @@
+"""guguji reinforcement learning package."""

+ 112 - 0
guguji_rl/guguji_rl/config.py

@@ -0,0 +1,112 @@
+from __future__ import annotations
+
+import copy
+from pathlib import Path
+from typing import Any
+
+import yaml
+
+
+DEFAULT_CONFIG: dict[str, Any] = {
+    'experiment': {
+        'name': 'guguji_rl_experiment',
+    },
+    'robot': {
+        'model_name': 'guguji',
+        'urdf_path': 'guguji_ros2_ws/src/guguji_ros2/urdf/guguji.urdf',
+        'joint_names': [],
+        'command_topic_prefix': '/guguji/command',
+    },
+    'ros': {
+        'joint_state_topic': '/joint_states',
+        'tf_topic': '/tf',
+        'clock_topic': '/clock',
+        'world_control_service': '/world/default/control',
+    },
+    'sim': {
+        'world_name': 'default',
+        'step_mode': 'realtime',
+        'control_dt': 0.05,
+        'service_step_iterations': 50,
+        'reset_settle_seconds': 1.0,
+        'action_publish_delay': 0.01,
+        'post_step_wait_seconds': 0.01,
+    },
+    'task': {
+        'target_forward_velocity': 0.0,
+        'target_base_height': None,
+        'max_roll_rad': 0.6,
+        'max_pitch_rad': 0.6,
+        'min_base_height': 0.12,
+    },
+    'rewards': {
+        'alive_bonus': 1.0,
+        'velocity_tracking_scale': 1.0,
+        'velocity_tracking_sigma': 0.3,
+        'upright_scale': 1.0,
+        'height_scale': 1.0,
+        'action_rate_penalty_scale': 0.02,
+        'joint_limit_penalty_scale': 0.02,
+        'lateral_velocity_penalty_scale': 0.05,
+        'fall_penalty': -10.0,
+    },
+    'training': {
+        'algorithm': 'ppo',
+        'total_timesteps': 200000,
+        'max_episode_steps': 400,
+        'seed': 42,
+        'device': 'auto',
+        'learning_rate': 3e-4,
+        'n_steps': 1024,
+        'batch_size': 256,
+        'gamma': 0.99,
+        'gae_lambda': 0.95,
+        'clip_range': 0.2,
+        'ent_coef': 0.0,
+        'vf_coef': 0.5,
+        'policy_net_arch': [256, 256],
+        'checkpoint_freq': 20000,
+        'output_root': 'guguji_rl/outputs',
+    },
+    'evaluation': {
+        'episodes': 3,
+        'deterministic': True,
+    },
+}
+
+
+def _deep_update(base: dict[str, Any], override: dict[str, Any]) -> dict[str, Any]:
+    for key, value in override.items():
+        if isinstance(value, dict) and isinstance(base.get(key), dict):
+            _deep_update(base[key], value)
+        else:
+            base[key] = value
+    return base
+
+
+def load_config(config_path: str | Path) -> dict[str, Any]:
+    config_path = Path(config_path).resolve()
+    with config_path.open('r', encoding='utf-8') as file:
+        user_config = yaml.safe_load(file) or {}
+
+    config = _deep_update(copy.deepcopy(DEFAULT_CONFIG), user_config)
+    config['meta'] = {
+        'config_path': str(config_path),
+        'project_root': str(config_path.parents[2]),
+        'rl_root': str(config_path.parents[1]),
+    }
+    return config
+
+
+def resolve_project_path(config: dict[str, Any], relative_path: str | Path) -> Path:
+    relative_path = Path(relative_path)
+    if relative_path.is_absolute():
+        return relative_path
+    return Path(config['meta']['project_root']) / relative_path
+
+
+def save_yaml(data: dict[str, Any], output_path: str | Path) -> None:
+    output_path = Path(output_path)
+    output_path.parent.mkdir(parents=True, exist_ok=True)
+    with output_path.open('w', encoding='utf-8') as file:
+        yaml.safe_dump(data, file, sort_keys=False, allow_unicode=True)

+ 3 - 0
guguji_rl/guguji_rl/envs/__init__.py

@@ -0,0 +1,3 @@
+from .gazebo_biped_env import GazeboBipedEnv
+
+__all__ = ['GazeboBipedEnv']

+ 209 - 0
guguji_rl/guguji_rl/envs/gazebo_biped_env.py

@@ -0,0 +1,209 @@
+from __future__ import annotations
+
+import time
+from typing import Any
+
+import gymnasium as gym
+import numpy as np
+
+from ..config import resolve_project_path
+from ..math_utils import quaternion_xyzw_to_euler
+from ..rewards import BipedRewardCalculator, RewardContext
+from ..ros2_interface import GugujiRos2Interface, RobotStateSnapshot
+from ..urdf_utils import JointLimit, parse_joint_limits
+
+
+class GazeboBipedEnv(gym.Env):
+    """把 Gazebo + ROS 2 机器人封装成 Gymnasium 环境。"""
+
+    metadata = {'render_modes': []}
+
+    def __init__(self, config: dict[str, Any]) -> None:
+        super().__init__()
+        self.config = config
+        self.robot_config = config['robot']
+        self.ros_config = config['ros']
+        self.sim_config = config['sim']
+        self.task_config = config['task']
+        self.training_config = config['training']
+
+        urdf_path = resolve_project_path(config, self.robot_config['urdf_path'])
+        self.joint_limits: list[JointLimit] = parse_joint_limits(
+            urdf_path,
+            self.robot_config['joint_names'],
+        )
+        self.joint_names = [joint_limit.name for joint_limit in self.joint_limits]
+        self.joint_lower = np.array([joint.lower for joint in self.joint_limits], dtype=np.float32)
+        self.joint_upper = np.array([joint.upper for joint in self.joint_limits], dtype=np.float32)
+        self.joint_mid = np.array([joint.midpoint for joint in self.joint_limits], dtype=np.float32)
+        self.joint_half_range = np.array([joint.half_range for joint in self.joint_limits], dtype=np.float32)
+
+        self.interface = GugujiRos2Interface(
+            joint_names=self.joint_names,
+            command_topic_prefix=self.robot_config['command_topic_prefix'],
+            joint_state_topic=self.ros_config['joint_state_topic'],
+            tf_topic=self.ros_config['tf_topic'],
+            clock_topic=self.ros_config['clock_topic'],
+            world_control_service=self.ros_config['world_control_service'],
+            model_name=self.robot_config['model_name'],
+        )
+        self.interface.wait_for_world_control_service()
+        self.reward_calculator = BipedRewardCalculator(config['rewards'])
+
+        # 动作空间采用每个关节一个归一化动作,后面再映射回真实关节角度范围。
+        self.action_space = gym.spaces.Box(
+            low=-1.0,
+            high=1.0,
+            shape=(len(self.joint_names),),
+            dtype=np.float32,
+        )
+        # 观测里包含关节位置/速度、上一时刻动作,以及基座姿态和速度信息。
+        self.observation_space = gym.spaces.Box(
+            low=-np.inf,
+            high=np.inf,
+            shape=(len(self.joint_names) * 3 + 5,),
+            dtype=np.float32,
+        )
+
+        self.previous_action = np.zeros(len(self.joint_names), dtype=np.float32)
+        self.previous_snapshot: RobotStateSnapshot | None = None
+        self.current_snapshot: RobotStateSnapshot | None = None
+        self.step_count = 0
+        self.target_base_height = self.task_config['target_base_height']
+
+    def _normalized_joint_position(self, joint_position: np.ndarray) -> np.ndarray:
+        return (joint_position - self.joint_mid) / self.joint_half_range
+
+    def _action_to_joint_targets(self, action: np.ndarray) -> np.ndarray:
+        clipped_action = np.clip(action, -1.0, 1.0)
+        # 将 [-1, 1] 的策略输出缩放到 URDF 关节极限范围内。
+        joint_targets = self.joint_mid + clipped_action * self.joint_half_range
+        return np.clip(joint_targets, self.joint_lower, self.joint_upper).astype(np.float32)
+
+    def _get_target_base_height(self, snapshot: RobotStateSnapshot) -> float:
+        if self.target_base_height is None:
+            self.target_base_height = float(snapshot.base_position[2])
+        return float(self.target_base_height)
+
+    def _build_observation(
+        self,
+        snapshot: RobotStateSnapshot,
+        previous_snapshot: RobotStateSnapshot,
+        previous_action: np.ndarray,
+    ) -> np.ndarray:
+        dt = max(snapshot.sim_time - previous_snapshot.sim_time, self.sim_config['control_dt'], 1e-3)
+        velocity = (snapshot.base_position - previous_snapshot.base_position) / dt
+        roll, pitch, _ = quaternion_xyzw_to_euler(snapshot.base_quaternion)
+        observation = np.concatenate(
+            [
+                self._normalized_joint_position(snapshot.joint_position),
+                snapshot.joint_velocity,
+                previous_action,
+                np.array(
+                    [
+                        snapshot.base_position[2],
+                        roll,
+                        pitch,
+                        velocity[0],
+                        self.task_config['target_forward_velocity'],
+                    ],
+                    dtype=np.float32,
+                ),
+            ],
+            dtype=np.float32,
+        )
+        return observation
+
+    def _publish_zero_action(self) -> None:
+        zero_target = {joint_name: 0.0 for joint_name in self.joint_names}
+        self.interface.publish_joint_targets(zero_target)
+
+    def _advance_simulation(self) -> None:
+        if self.sim_config['step_mode'] == 'service_step':
+            # service_step 更适合训练:先发动作,再精确推进固定步数。
+            time.sleep(float(self.sim_config['action_publish_delay']))
+            self.interface.step_world(int(self.sim_config['service_step_iterations']))
+            time.sleep(float(self.sim_config['post_step_wait_seconds']))
+        else:
+            # realtime 更接近交互式调试,但训练时可重复性会差一些。
+            time.sleep(float(self.sim_config['control_dt']))
+
+    def _terminated(self, snapshot: RobotStateSnapshot) -> bool:
+        roll, pitch, _ = quaternion_xyzw_to_euler(snapshot.base_quaternion)
+        if abs(roll) > float(self.task_config['max_roll_rad']):
+            return True
+        if abs(pitch) > float(self.task_config['max_pitch_rad']):
+            return True
+        if float(snapshot.base_position[2]) < float(self.task_config['min_base_height']):
+            return True
+        return False
+
+    def reset(self, *, seed: int | None = None, options: dict[str, Any] | None = None):
+        super().reset(seed=seed)
+        pause_after_reset = self.sim_config['step_mode'] == 'service_step'
+        self.interface.reset_world(pause_after_reset=pause_after_reset)
+        self._publish_zero_action()
+        time.sleep(float(self.sim_config['reset_settle_seconds']))
+
+        if pause_after_reset:
+            self.interface.step_world(int(self.sim_config['service_step_iterations']))
+
+        snapshot = self.interface.wait_for_snapshot()
+        self._get_target_base_height(snapshot)
+        self.current_snapshot = snapshot
+        self.previous_snapshot = snapshot
+        self.previous_action = np.zeros(len(self.joint_names), dtype=np.float32)
+        self.step_count = 0
+
+        observation = self._build_observation(snapshot, snapshot, self.previous_action)
+        info = {
+            'reset': True,
+            'target_base_height': float(self.target_base_height),
+            'target_forward_velocity': float(self.task_config['target_forward_velocity']),
+        }
+        return observation, info
+
+    def step(self, action: np.ndarray):
+        if self.current_snapshot is None or self.previous_snapshot is None:
+            raise RuntimeError('环境尚未 reset,不能直接 step。')
+
+        self.step_count += 1
+        action = np.asarray(action, dtype=np.float32)
+        joint_targets = self._action_to_joint_targets(action)
+        self.interface.publish_joint_targets(joint_targets)
+        self._advance_simulation()
+
+        self.previous_snapshot = self.current_snapshot
+        self.current_snapshot = self.interface.wait_for_snapshot()
+        terminated = self._terminated(self.current_snapshot)
+        truncated = self.step_count >= int(self.training_config['max_episode_steps'])
+
+        reward, reward_terms = self.reward_calculator.compute(
+            RewardContext(
+                current=self.current_snapshot,
+                previous=self.previous_snapshot,
+                action=action,
+                previous_action=self.previous_action,
+                joint_limits=self.joint_limits,
+                target_forward_velocity=float(self.task_config['target_forward_velocity']),
+                target_base_height=float(self.target_base_height),
+                control_dt=float(self.sim_config['control_dt']),
+                terminated=terminated,
+            )
+        )
+        # 训练时保留奖励分项,方便后面定位“为什么学不会走”。
+        observation = self._build_observation(
+            self.current_snapshot,
+            self.previous_snapshot,
+            self.previous_action,
+        )
+        self.previous_action = action.copy()
+
+        info = {
+            'joint_targets': joint_targets.tolist(),
+            'reward_terms': reward_terms,
+        }
+        return observation, reward, terminated, truncated, info
+
+    def close(self) -> None:
+        self.interface.close()

+ 30 - 0
guguji_rl/guguji_rl/math_utils.py

@@ -0,0 +1,30 @@
+from __future__ import annotations
+
+import math
+from typing import Iterable
+
+import numpy as np
+
+
+def quaternion_xyzw_to_euler(quaternion: Iterable[float]) -> tuple[float, float, float]:
+    x, y, z, w = quaternion
+
+    sinr_cosp = 2.0 * (w * x + y * z)
+    cosr_cosp = 1.0 - 2.0 * (x * x + y * y)
+    roll = math.atan2(sinr_cosp, cosr_cosp)
+
+    sinp = 2.0 * (w * y - z * x)
+    if abs(sinp) >= 1.0:
+        pitch = math.copysign(math.pi / 2.0, sinp)
+    else:
+        pitch = math.asin(sinp)
+
+    siny_cosp = 2.0 * (w * z + x * y)
+    cosy_cosp = 1.0 - 2.0 * (y * y + z * z)
+    yaw = math.atan2(siny_cosp, cosy_cosp)
+
+    return roll, pitch, yaw
+
+
+def safe_array(values: Iterable[float], dtype=np.float32) -> np.ndarray:
+    return np.asarray(list(values), dtype=dtype)

+ 74 - 0
guguji_rl/guguji_rl/rewards.py

@@ -0,0 +1,74 @@
+from __future__ import annotations
+
+from dataclasses import dataclass
+
+import numpy as np
+
+from .math_utils import quaternion_xyzw_to_euler
+from .ros2_interface import RobotStateSnapshot
+from .urdf_utils import JointLimit
+
+
+@dataclass
+class RewardContext:
+    current: RobotStateSnapshot
+    previous: RobotStateSnapshot
+    action: np.ndarray
+    previous_action: np.ndarray
+    joint_limits: list[JointLimit]
+    target_forward_velocity: float
+    target_base_height: float
+    control_dt: float
+    terminated: bool
+
+
+class BipedRewardCalculator:
+    def __init__(self, reward_config: dict) -> None:
+        self.reward_config = reward_config
+
+    def compute(self, context: RewardContext) -> tuple[float, dict[str, float]]:
+        dt = max(context.current.sim_time - context.previous.sim_time, context.control_dt, 1e-3)
+        delta_position = context.current.base_position - context.previous.base_position
+        forward_velocity = float(delta_position[0] / dt)
+        lateral_velocity = float(delta_position[1] / dt)
+
+        roll, pitch, _ = quaternion_xyzw_to_euler(context.current.base_quaternion)
+        upright_reward = np.exp(-4.0 * (roll * roll + pitch * pitch))
+        height_error = context.current.base_position[2] - context.target_base_height
+        height_reward = np.exp(-8.0 * height_error * height_error)
+
+        sigma = max(float(self.reward_config['velocity_tracking_sigma']), 1e-6)
+        velocity_error = forward_velocity - context.target_forward_velocity
+        velocity_tracking = np.exp(-(velocity_error * velocity_error) / (2.0 * sigma * sigma))
+
+        action_rate_penalty = float(np.mean(np.square(context.action - context.previous_action)))
+
+        joint_limit_penalty = 0.0
+        for index, joint_limit in enumerate(context.joint_limits):
+            normalized = abs((context.current.joint_position[index] - joint_limit.midpoint) / joint_limit.half_range)
+            joint_limit_penalty += max(normalized - 0.9, 0.0)
+        joint_limit_penalty /= max(len(context.joint_limits), 1)
+
+        reward_terms = {
+            'alive_bonus': float(self.reward_config['alive_bonus']),
+            'velocity_tracking': float(self.reward_config['velocity_tracking_scale']) * float(velocity_tracking),
+            'upright': float(self.reward_config['upright_scale']) * float(upright_reward),
+            'height': float(self.reward_config['height_scale']) * float(height_reward),
+            'action_rate_penalty': -float(self.reward_config['action_rate_penalty_scale']) * action_rate_penalty,
+            'joint_limit_penalty': -float(self.reward_config['joint_limit_penalty_scale']) * joint_limit_penalty,
+            'lateral_velocity_penalty': -float(self.reward_config['lateral_velocity_penalty_scale']) * abs(lateral_velocity),
+        }
+
+        total_reward = sum(reward_terms.values())
+        if context.terminated:
+            reward_terms['fall_penalty'] = float(self.reward_config['fall_penalty'])
+            total_reward += reward_terms['fall_penalty']
+        else:
+            reward_terms['fall_penalty'] = 0.0
+
+        reward_terms['forward_velocity'] = forward_velocity
+        reward_terms['roll'] = float(roll)
+        reward_terms['pitch'] = float(pitch)
+        reward_terms['base_height'] = float(context.current.base_position[2])
+        reward_terms['total_reward'] = float(total_reward)
+        return float(total_reward), reward_terms

+ 253 - 0
guguji_rl/guguji_rl/ros2_interface.py

@@ -0,0 +1,253 @@
+from __future__ import annotations
+
+import threading
+import time
+from dataclasses import dataclass
+from typing import Iterable
+
+import numpy as np
+import rclpy
+from rclpy.context import Context
+from rclpy.executors import SingleThreadedExecutor
+from rclpy.node import Node
+from ros_gz_interfaces.srv import ControlWorld
+from rosgraph_msgs.msg import Clock
+from sensor_msgs.msg import JointState
+from std_msgs.msg import Float64
+from tf2_msgs.msg import TFMessage
+
+
+@dataclass
+class RobotStateSnapshot:
+    sim_time: float
+    joint_position: np.ndarray
+    joint_velocity: np.ndarray
+    base_position: np.ndarray
+    base_quaternion: np.ndarray
+
+
+class _GugujiInterfaceNode(Node):
+    def __init__(
+        self,
+        *,
+        context: Context,
+        joint_names: list[str],
+        command_topic_prefix: str,
+        joint_state_topic: str,
+        tf_topic: str,
+        clock_topic: str,
+        world_control_service: str,
+        model_name: str,
+    ) -> None:
+        super().__init__('guguji_rl_interface', context=context)
+        self.joint_names = joint_names
+        self.model_name = model_name
+        self.command_publishers = {
+            joint_name: self.create_publisher(
+                Float64,
+                f'{command_topic_prefix}/{joint_name}',
+                10,
+            )
+            for joint_name in joint_names
+        }
+
+        self._lock = threading.Lock()
+        self._latest_joint_state: JointState | None = None
+        self._latest_tf: TFMessage | None = None
+        self._latest_clock: Clock | None = None
+
+        self.create_subscription(JointState, joint_state_topic, self._joint_state_callback, 10)
+        self.create_subscription(TFMessage, tf_topic, self._tf_callback, 50)
+        self.create_subscription(Clock, clock_topic, self._clock_callback, 10)
+        self.world_control_client = self.create_client(ControlWorld, world_control_service)
+
+    def _joint_state_callback(self, message: JointState) -> None:
+        with self._lock:
+            self._latest_joint_state = message
+
+    def _tf_callback(self, message: TFMessage) -> None:
+        with self._lock:
+            self._latest_tf = message
+
+    def _clock_callback(self, message: Clock) -> None:
+        with self._lock:
+            self._latest_clock = message
+
+    def snapshot(self) -> RobotStateSnapshot | None:
+        with self._lock:
+            joint_state = self._latest_joint_state
+            tf_message = self._latest_tf
+            clock_message = self._latest_clock
+
+        if joint_state is None:
+            return None
+
+        joint_map = {name: index for index, name in enumerate(joint_state.name)}
+        joint_position = np.zeros(len(self.joint_names), dtype=np.float32)
+        joint_velocity = np.zeros(len(self.joint_names), dtype=np.float32)
+
+        for output_index, joint_name in enumerate(self.joint_names):
+            source_index = joint_map.get(joint_name)
+            if source_index is None:
+                continue
+            if source_index < len(joint_state.position):
+                joint_position[output_index] = joint_state.position[source_index]
+            if source_index < len(joint_state.velocity):
+                joint_velocity[output_index] = joint_state.velocity[source_index]
+
+        base_position = np.zeros(3, dtype=np.float32)
+        base_quaternion = np.array([0.0, 0.0, 0.0, 1.0], dtype=np.float32)
+
+        if tf_message is not None:
+            for transform in tf_message.transforms:
+                if transform.child_frame_id == self.model_name:
+                    base_position = np.array(
+                        [
+                            transform.transform.translation.x,
+                            transform.transform.translation.y,
+                            transform.transform.translation.z,
+                        ],
+                        dtype=np.float32,
+                    )
+                    base_quaternion = np.array(
+                        [
+                            transform.transform.rotation.x,
+                            transform.transform.rotation.y,
+                            transform.transform.rotation.z,
+                            transform.transform.rotation.w,
+                        ],
+                        dtype=np.float32,
+                    )
+                    break
+
+        sim_time = 0.0
+        if clock_message is not None:
+            sim_time = clock_message.clock.sec + clock_message.clock.nanosec * 1e-9
+
+        return RobotStateSnapshot(
+            sim_time=sim_time,
+            joint_position=joint_position,
+            joint_velocity=joint_velocity,
+            base_position=base_position,
+            base_quaternion=base_quaternion,
+        )
+
+
+class GugujiRos2Interface:
+    """封装 RL 环境需要的 ROS 2 通信接口。"""
+
+    def __init__(
+        self,
+        *,
+        joint_names: list[str],
+        command_topic_prefix: str,
+        joint_state_topic: str,
+        tf_topic: str,
+        clock_topic: str,
+        world_control_service: str,
+        model_name: str,
+    ) -> None:
+        self._context = Context()
+        # 训练程序使用独立 context,避免和外部 ROS 2 进程互相干扰。
+        rclpy.init(args=None, context=self._context)
+
+        self._node = _GugujiInterfaceNode(
+            context=self._context,
+            joint_names=joint_names,
+            command_topic_prefix=command_topic_prefix,
+            joint_state_topic=joint_state_topic,
+            tf_topic=tf_topic,
+            clock_topic=clock_topic,
+            world_control_service=world_control_service,
+            model_name=model_name,
+        )
+
+        self._executor = SingleThreadedExecutor(context=self._context)
+        self._executor.add_node(self._node)
+        # 单独起一个线程持续 spin,这样训练主循环可以专注做 step / reward。
+        self._spin_thread = threading.Thread(target=self._executor.spin, daemon=True)
+        self._spin_thread.start()
+
+    def wait_for_world_control_service(self, timeout: float = 10.0) -> None:
+        deadline = time.time() + timeout
+        while time.time() < deadline:
+            if self._node.world_control_client.wait_for_service(timeout_sec=0.2):
+                return
+        raise TimeoutError('等待 Gazebo world control 服务超时。')
+
+    def wait_for_snapshot(self, timeout: float = 5.0) -> RobotStateSnapshot:
+        deadline = time.time() + timeout
+        while time.time() < deadline:
+            snapshot = self._node.snapshot()
+            if snapshot is not None:
+                # snapshot 把训练需要的 joint_states / tf / clock 聚合成了一份结构化状态。
+                return snapshot
+            time.sleep(0.05)
+        raise TimeoutError('等待 joint_states / tf 数据超时。')
+
+    def publish_joint_targets(self, joint_targets: dict[str, float] | Iterable[float]) -> None:
+        if isinstance(joint_targets, dict):
+            target_map = joint_targets
+        else:
+            target_map = {
+                joint_name: float(value)
+                for joint_name, value in zip(self._node.joint_names, joint_targets)
+            }
+
+        for joint_name, target_value in target_map.items():
+            publisher = self._node.command_publishers[joint_name]
+            message = Float64()
+            message.data = float(target_value)
+            publisher.publish(message)
+
+    def control_world(
+        self,
+        *,
+        pause: bool | None = None,
+        step: bool = False,
+        multi_step: int = 0,
+        reset_all: bool = False,
+        reset_model_only: bool = False,
+        reset_time_only: bool = False,
+        timeout: float = 5.0,
+    ) -> bool:
+        request = ControlWorld.Request()
+        if pause is not None:
+            request.world_control.pause = pause
+        request.world_control.step = step
+        request.world_control.multi_step = int(multi_step)
+        request.world_control.reset.all = reset_all
+        request.world_control.reset.model_only = reset_model_only
+        request.world_control.reset.time_only = reset_time_only
+
+        future = self._node.world_control_client.call_async(request)
+        deadline = time.time() + timeout
+
+        while time.time() < deadline:
+            if future.done():
+                response = future.result()
+                return bool(response.success)
+            time.sleep(0.01)
+
+        raise TimeoutError('调用 Gazebo world control 服务超时。')
+
+    def reset_world(self, *, pause_after_reset: bool) -> bool:
+        return self.control_world(
+            pause=pause_after_reset,
+            reset_all=True,
+        )
+
+    def step_world(self, multi_step: int) -> bool:
+        return self.control_world(
+            pause=True,
+            step=True,
+            multi_step=multi_step,
+        )
+
+    def close(self) -> None:
+        try:
+            self._executor.shutdown()
+        finally:
+            self._node.destroy_node()
+            rclpy.shutdown(context=self._context)
+            self._spin_thread.join(timeout=1.0)

+ 66 - 0
guguji_rl/guguji_rl/urdf_utils.py

@@ -0,0 +1,66 @@
+from __future__ import annotations
+
+from dataclasses import dataclass
+from pathlib import Path
+from typing import Iterable
+from xml.etree import ElementTree as ET
+
+
+@dataclass(frozen=True)
+class JointLimit:
+    name: str
+    lower: float
+    upper: float
+    effort: float
+    velocity: float
+
+    @property
+    def midpoint(self) -> float:
+        return (self.lower + self.upper) * 0.5
+
+    @property
+    def half_range(self) -> float:
+        return max((self.upper - self.lower) * 0.5, 1e-6)
+
+
+def parse_joint_limits(urdf_path: str | Path, joint_names: Iterable[str] | None = None) -> list[JointLimit]:
+    urdf_path = Path(urdf_path)
+    root = ET.parse(urdf_path).getroot()
+    expected_names = set(joint_names or [])
+    result: list[JointLimit] = []
+
+    for joint_element in root.findall('joint'):
+        joint_type = joint_element.attrib.get('type', '')
+        if joint_type not in {'revolute', 'continuous'}:
+            continue
+
+        name = joint_element.attrib['name']
+        if expected_names and name not in expected_names:
+            continue
+
+        limit_element = joint_element.find('limit')
+        if limit_element is None:
+            continue
+
+        lower = float(limit_element.attrib.get('lower', '-3.14159'))
+        upper = float(limit_element.attrib.get('upper', '3.14159'))
+        effort = float(limit_element.attrib.get('effort', '0.0'))
+        velocity = float(limit_element.attrib.get('velocity', '0.0'))
+        result.append(
+            JointLimit(
+                name=name,
+                lower=lower,
+                upper=upper,
+                effort=effort,
+                velocity=velocity,
+            )
+        )
+
+    if expected_names:
+        found_names = {item.name for item in result}
+        missing_names = expected_names - found_names
+        if missing_names:
+            missing_text = ', '.join(sorted(missing_names))
+            raise ValueError(f'URDF 中缺少这些关节限制: {missing_text}')
+
+    return result

+ 22 - 0
guguji_rl/pyproject.toml

@@ -0,0 +1,22 @@
+[build-system]
+requires = ["setuptools>=68", "wheel"]
+build-backend = "setuptools.build_meta"
+
+[project]
+name = "guguji-rl"
+version = "0.1.0"
+description = "Reinforcement learning training stack for the guguji biped robot"
+readme = "README.md"
+requires-python = ">=3.10"
+dependencies = [
+  "gymnasium>=0.29",
+  "numpy>=1.26",
+  "PyYAML>=6.0",
+  "rich>=13.7",
+  "stable-baselines3>=2.3",
+  "tensorboard>=2.14",
+]
+
+[tool.setuptools.packages.find]
+where = ["."]
+include = ["guguji_rl*"]

+ 9 - 0
guguji_rl/requirements.txt

@@ -0,0 +1,9 @@
+# 说明:
+# 1. 这里不强行固定 torch 版本,方便你根据 CPU / CUDA 环境自行安装。
+# 2. 如果你要使用 GPU,请先按 PyTorch 官方方式安装带 CUDA 的 torch。
+gymnasium>=0.29
+numpy>=1.26
+PyYAML>=6.0
+rich>=13.7
+stable-baselines3>=2.3
+tensorboard>=2.14

+ 65 - 0
guguji_rl/scripts/check_env.py

@@ -0,0 +1,65 @@
+from __future__ import annotations
+
+import argparse
+import sys
+from pathlib import Path
+
+import numpy as np
+
+SCRIPT_ROOT = Path(__file__).resolve().parents[1]
+if str(SCRIPT_ROOT) not in sys.path:
+    sys.path.insert(0, str(SCRIPT_ROOT))
+
+from guguji_rl.config import load_config
+
+
+def parse_args() -> argparse.Namespace:
+    parser = argparse.ArgumentParser(description='Sanity check for the Gazebo RL environment.')
+    parser.add_argument('--config', default='configs/balance_ppo.yaml', help='配置文件路径')
+    parser.add_argument('--steps', type=int, default=5, help='检查时执行多少个 step')
+    return parser.parse_args()
+
+
+def resolve_input_path(path_str: str) -> Path:
+    path = Path(path_str)
+    if path.is_absolute() or path.exists():
+        return path
+    return SCRIPT_ROOT / path
+
+
+def main() -> int:
+    args = parse_args()
+
+    try:
+        from guguji_rl.envs import GazeboBipedEnv
+    except ImportError:
+        print('缺少训练环境依赖,请先进入 guguji_rl 目录安装 requirements.txt', file=sys.stderr)
+        return 1
+
+    config = load_config(resolve_input_path(args.config))
+    env = GazeboBipedEnv(config)
+
+    observation, info = env.reset()
+    print('reset ok')
+    print(f'observation shape: {observation.shape}')
+    print(f'target_base_height: {info["target_base_height"]:.4f}')
+
+    for step_index in range(args.steps):
+        action = np.zeros(env.action_space.shape[0], dtype=np.float32)
+        observation, reward, terminated, truncated, info = env.step(action)
+        reward_terms = info['reward_terms']
+        print(
+            f'step={step_index} reward={reward:.3f} '
+            f'vx={reward_terms["forward_velocity"]:.3f} '
+            f'base_z={reward_terms["base_height"]:.3f}'
+        )
+        if terminated or truncated:
+            print(f'episode finished early: terminated={terminated} truncated={truncated}')
+            break
+
+    env.close()
+    return 0
+
+
+if __name__ == '__main__':
+    raise SystemExit(main())

+ 66 - 0
guguji_rl/scripts/evaluate.py

@@ -0,0 +1,66 @@
+from __future__ import annotations
+
+import argparse
+import sys
+from pathlib import Path
+
+SCRIPT_ROOT = Path(__file__).resolve().parents[1]
+if str(SCRIPT_ROOT) not in sys.path:
+    sys.path.insert(0, str(SCRIPT_ROOT))
+
+from guguji_rl.config import load_config
+
+
+def parse_args() -> argparse.Namespace:
+    parser = argparse.ArgumentParser(description='Evaluate trained PPO policy in Gazebo.')
+    parser.add_argument('--config', default='configs/walk_ppo.yaml', help='配置文件路径')
+    parser.add_argument('--model', required=True, help='训练好的模型路径,例如 outputs/.../final_model.zip')
+    parser.add_argument('--episodes', type=int, default=None, help='可选覆盖配置中的评估轮数')
+    return parser.parse_args()
+
+
+def resolve_input_path(path_str: str) -> Path:
+    path = Path(path_str)
+    if path.is_absolute() or path.exists():
+        return path
+    return SCRIPT_ROOT / path
+
+
+def main() -> int:
+    args = parse_args()
+
+    try:
+        from stable_baselines3 import PPO
+    except ImportError:
+        print('缺少 stable-baselines3,请先安装 requirements.txt', file=sys.stderr)
+        return 1
+
+    from guguji_rl.envs import GazeboBipedEnv
+
+    config = load_config(resolve_input_path(args.config))
+    if args.episodes is not None:
+        config['evaluation']['episodes'] = args.episodes
+
+    env = GazeboBipedEnv(config)
+    model = PPO.load(resolve_input_path(args.model))
+
+    for episode_index in range(int(config['evaluation']['episodes'])):
+        observation, info = env.reset()
+        done = False
+        truncated = False
+        total_reward = 0.0
+        while not done and not truncated:
+            action, _ = model.predict(
+                observation,
+                deterministic=bool(config['evaluation']['deterministic']),
+            )
+            observation, reward, done, truncated, info = env.step(action)
+            total_reward += reward
+        print(f'episode={episode_index} total_reward={total_reward:.3f}')
+
+    env.close()
+    return 0
+
+
+if __name__ == '__main__':
+    raise SystemExit(main())

+ 87 - 0
guguji_rl/scripts/run_policy.py

@@ -0,0 +1,87 @@
+from __future__ import annotations
+
+import argparse
+import sys
+from pathlib import Path
+
+SCRIPT_ROOT = Path(__file__).resolve().parents[1]
+if str(SCRIPT_ROOT) not in sys.path:
+    sys.path.insert(0, str(SCRIPT_ROOT))
+
+from guguji_rl.config import load_config
+
+
+def parse_args() -> argparse.Namespace:
+    parser = argparse.ArgumentParser(description='Run a trained PPO policy online in Gazebo / ROS 2.')
+    parser.add_argument('--config', default='configs/walk_ppo.yaml', help='配置文件路径')
+    parser.add_argument('--model', required=True, help='训练好的模型路径,例如 outputs/.../final_model.zip')
+    parser.add_argument(
+        '--deterministic',
+        action='store_true',
+        help='是否强制使用确定性动作,适合策略回放和部署调试',
+    )
+    parser.add_argument(
+        '--max-episodes',
+        type=int,
+        default=0,
+        help='最多运行多少个 episode,0 表示一直循环运行',
+    )
+    return parser.parse_args()
+
+
+def resolve_input_path(path_str: str) -> Path:
+    path = Path(path_str)
+    if path.is_absolute() or path.exists():
+        return path
+    return SCRIPT_ROOT / path
+
+
+def main() -> int:
+    args = parse_args()
+
+    try:
+        from stable_baselines3 import PPO
+    except ImportError:
+        print('缺少 stable-baselines3,请先安装 requirements.txt', file=sys.stderr)
+        return 1
+
+    from guguji_rl.envs import GazeboBipedEnv
+
+    config = load_config(resolve_input_path(args.config))
+
+    env = GazeboBipedEnv(config)
+    model = PPO.load(resolve_input_path(args.model))
+
+    episode_index = 0
+    try:
+        while args.max_episodes <= 0 or episode_index < args.max_episodes:
+            # 这里直接复用训练环境,方便策略回放和后续在线部署共用同一套观测/动作逻辑。
+            observation, _ = env.reset()
+            terminated = False
+            truncated = False
+            total_reward = 0.0
+
+            while not terminated and not truncated:
+                action, _ = model.predict(
+                    observation,
+                    deterministic=args.deterministic or bool(config['evaluation']['deterministic']),
+                )
+                observation, reward, terminated, truncated, _ = env.step(action)
+                total_reward += reward
+
+            print(
+                f'episode={episode_index} '
+                f'total_reward={total_reward:.3f} '
+                f'terminated={terminated} truncated={truncated}'
+            )
+            episode_index += 1
+    except KeyboardInterrupt:
+        print('收到 Ctrl+C,停止在线策略运行。')
+    finally:
+        env.close()
+
+    return 0
+
+
+if __name__ == '__main__':
+    raise SystemExit(main())

+ 131 - 0
guguji_rl/scripts/train.py

@@ -0,0 +1,131 @@
+from __future__ import annotations
+
+import argparse
+import sys
+from datetime import datetime
+from pathlib import Path
+
+SCRIPT_ROOT = Path(__file__).resolve().parents[1]
+if str(SCRIPT_ROOT) not in sys.path:
+    sys.path.insert(0, str(SCRIPT_ROOT))
+
+import torch
+
+from guguji_rl.config import load_config, resolve_project_path, save_yaml
+
+
+def resolve_device(device_name: str) -> str:
+    if device_name == 'auto':
+        return 'cuda' if torch.cuda.is_available() else 'cpu'
+    if device_name == 'cuda' and not torch.cuda.is_available():
+        raise RuntimeError('配置要求使用 CUDA,但当前 torch 检测不到可用 GPU。')
+    return device_name
+
+
+def parse_args() -> argparse.Namespace:
+    parser = argparse.ArgumentParser(description='Train PPO policy for guguji biped robot.')
+    parser.add_argument(
+        '--config',
+        default='configs/balance_ppo.yaml',
+        help='训练配置文件路径,默认使用 balance_ppo.yaml',
+    )
+    parser.add_argument(
+        '--device',
+        default=None,
+        help='可选覆盖配置文件中的设备设置,例如 cpu / cuda / auto',
+    )
+    parser.add_argument(
+        '--total-timesteps',
+        type=int,
+        default=None,
+        help='可选覆盖配置文件中的 total_timesteps',
+    )
+    return parser.parse_args()
+
+
+def resolve_input_path(path_str: str) -> Path:
+    path = Path(path_str)
+    if path.is_absolute() or path.exists():
+        return path
+    return SCRIPT_ROOT / path
+
+
+def main() -> int:
+    args = parse_args()
+
+    try:
+        from stable_baselines3 import PPO
+        from stable_baselines3.common.callbacks import CheckpointCallback
+        from stable_baselines3.common.monitor import Monitor
+    except ImportError:
+        print(
+            '缺少 stable-baselines3,请先进入 guguji_rl 目录安装依赖: '
+            'pip install -r requirements.txt',
+            file=sys.stderr,
+        )
+        return 1
+
+    from guguji_rl.envs import GazeboBipedEnv
+
+    config = load_config(resolve_input_path(args.config))
+    if args.device is not None:
+        config['training']['device'] = args.device
+    if args.total_timesteps is not None:
+        config['training']['total_timesteps'] = args.total_timesteps
+
+    # 这里统一解析训练设备,方便你只改 YAML 就切换 CPU / GPU。
+    config['training']['device'] = resolve_device(config['training']['device'])
+
+    output_root = resolve_project_path(config, config['training']['output_root'])
+    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
+    run_dir = output_root / f"{config['experiment']['name']}_{timestamp}"
+    run_dir.mkdir(parents=True, exist_ok=True)
+    # 保存一份展开后的配置,便于后面复现实验。
+    save_yaml(config, run_dir / 'resolved_config.yaml')
+
+    env = Monitor(GazeboBipedEnv(config))
+    checkpoint_callback = CheckpointCallback(
+        save_freq=max(int(config['training']['checkpoint_freq']), 1),
+        save_path=str(run_dir / 'checkpoints'),
+        name_prefix='guguji_ppo',
+    )
+
+    policy_kwargs = {
+        'net_arch': list(config['training']['policy_net_arch']),
+    }
+
+    print(f"训练设备: {config['training']['device']}")
+    print(f"输出目录: {run_dir}")
+
+    # 先用 MLP + PPO 跑通训练闭环,后面你可以再逐步增大网络规模。
+    model = PPO(
+        policy='MlpPolicy',
+        env=env,
+        verbose=1,
+        seed=int(config['training']['seed']),
+        learning_rate=float(config['training']['learning_rate']),
+        n_steps=int(config['training']['n_steps']),
+        batch_size=int(config['training']['batch_size']),
+        gamma=float(config['training']['gamma']),
+        gae_lambda=float(config['training']['gae_lambda']),
+        clip_range=float(config['training']['clip_range']),
+        ent_coef=float(config['training']['ent_coef']),
+        vf_coef=float(config['training']['vf_coef']),
+        device=config['training']['device'],
+        tensorboard_log=str(run_dir / 'tensorboard'),
+        policy_kwargs=policy_kwargs,
+    )
+
+    model.learn(
+        total_timesteps=int(config['training']['total_timesteps']),
+        callback=checkpoint_callback,
+        progress_bar=True,
+    )
+    model.save(run_dir / 'final_model')
+    env.close()
+    print(f'训练完成,模型已保存到: {run_dir / "final_model.zip"}')
+    return 0
+
+
+if __name__ == '__main__':
+    raise SystemExit(main())

+ 2 - 0
guguji_ros2_ws/src/guguji_ros2/launch/gazebo.launch.py

@@ -77,6 +77,8 @@ def _create_bridge_node(context, *args, **kwargs):
         '/clock@rosgraph_msgs/msg/Clock[gz.msgs.Clock',
         '/clock@rosgraph_msgs/msg/Clock[gz.msgs.Clock',
         # 关节状态:Gazebo -> ROS
         # 关节状态:Gazebo -> ROS
         f'/world/{world_name}/model/guguji/joint_state@sensor_msgs/msg/JointState[gz.msgs.Model',
         f'/world/{world_name}/model/guguji/joint_state@sensor_msgs/msg/JointState[gz.msgs.Model',
+        # 世界控制服务:RL 训练时用于 reset / pause / 单步推进
+        f'/world/{world_name}/control@ros_gz_interfaces/srv/ControlWorld',
     ]
     ]
 
 
     remappings = [
     remappings = [

+ 1 - 0
guguji_ros2_ws/src/guguji_ros2/package.xml

@@ -15,6 +15,7 @@
   <exec_depend>robot_state_publisher</exec_depend>
   <exec_depend>robot_state_publisher</exec_depend>
   <exec_depend>ros2launch</exec_depend>
   <exec_depend>ros2launch</exec_depend>
   <exec_depend>ros_gz_bridge</exec_depend>
   <exec_depend>ros_gz_bridge</exec_depend>
+  <exec_depend>ros_gz_interfaces</exec_depend>
   <exec_depend>ros_gz_sim</exec_depend>
   <exec_depend>ros_gz_sim</exec_depend>
   <exec_depend>rosgraph_msgs</exec_depend>
   <exec_depend>rosgraph_msgs</exec_depend>
   <exec_depend>rviz2</exec_depend>
   <exec_depend>rviz2</exec_depend>