| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266 |
- from __future__ import annotations
- import argparse
- import copy
- import sys
- from datetime import datetime
- from pathlib import Path
- from typing import Any
- SCRIPT_ROOT = Path(__file__).resolve().parents[1]
- if str(SCRIPT_ROOT) not in sys.path:
- sys.path.insert(0, str(SCRIPT_ROOT))
- import torch
- from guguji_rl.config import load_config, resolve_project_path, save_yaml
- from guguji_rl.evaluation import evaluate_forward_progress, print_forward_progress_summary
- def resolve_device(device_name: str) -> str:
- if device_name == 'auto':
- return 'cuda' if torch.cuda.is_available() else 'cpu'
- if device_name == 'cuda' and not torch.cuda.is_available():
- raise RuntimeError('配置要求使用 CUDA,但当前 torch 检测不到可用 GPU。')
- return device_name
- def parse_args() -> argparse.Namespace:
- parser = argparse.ArgumentParser(description='Train PPO policy for guguji biped robot.')
- parser.add_argument(
- '--config',
- default='configs/balance_ppo.yaml',
- help='训练配置文件路径,默认使用 balance_ppo.yaml',
- )
- parser.add_argument(
- '--device',
- default=None,
- help='可选覆盖配置文件中的设备设置,例如 cpu / cuda / auto',
- )
- parser.add_argument(
- '--total-timesteps',
- type=int,
- default=None,
- help='可选覆盖配置文件中的 total_timesteps',
- )
- parser.add_argument(
- '--init-model',
- default=None,
- help='可选指定一个已有 PPO 模型,用于继续训练或做课程学习初始化',
- )
- parser.add_argument(
- '--skip-auto-eval',
- action='store_true',
- help='训练完成后跳过自动前进评估',
- )
- return parser.parse_args()
- def resolve_input_path(path_str: str) -> Path:
- path = Path(path_str)
- if path.is_absolute() or path.exists():
- return path
- return SCRIPT_ROOT / path
- def maybe_override_policy_log_std(model: object, initial_log_std: float | None) -> None:
- """可选地缩小 PPO 的初始探索方差,适合课程学习后的精修阶段。"""
- if initial_log_std is None:
- return
- policy = getattr(model, 'policy', None)
- if policy is None or not hasattr(policy, 'log_std'):
- raise RuntimeError('当前策略对象不支持直接设置 log_std。')
- # 这里直接把每个动作维度的对数标准差统一改成同一个值,
- # 方便在“已有步态基础上继续训练”时降低探索噪声,减少无意义的乱踢。
- policy.log_std.data.fill_(float(initial_log_std))
- print(f'已将策略初始 log_std 设为: {float(initial_log_std):.3f}')
- def sanitize_stage_name(stage_name: str) -> str:
- sanitized = ''.join(
- character if character.isalnum() or character in {'-', '_'} else '_'
- for character in stage_name.strip()
- )
- return sanitized.strip('_') or 'stage'
- def build_curriculum_stage_configs(config: dict[str, Any]) -> list[tuple[str | None, dict[str, Any]]]:
- """把课程学习阶段展开成一组可直接训练的独立配置。"""
- raw_stages = config['training'].get('curriculum_stages') or []
- if not raw_stages:
- single_stage_config = copy.deepcopy(config)
- single_stage_config['training'].pop('curriculum_stages', None)
- return [(None, single_stage_config)]
- stage_configs: list[tuple[str | None, dict[str, Any]]] = []
- for stage_index, raw_stage in enumerate(raw_stages, start=1):
- if not isinstance(raw_stage, dict):
- raise RuntimeError('training.curriculum_stages 里的每个阶段都必须是字典。')
- stage_config = copy.deepcopy(config)
- stage_config['training'].pop('curriculum_stages', None)
- raw_name = str(raw_stage.get('name') or f'stage_{stage_index}')
- stage_name = f'{stage_index:02d}_{sanitize_stage_name(raw_name)}'
- # 课程阶段目前主要控制“目标前进速度 + 本阶段训练步数 + 探索方差”。
- # 这样 walking 阶段就能从慢到快逐段抬升,而不用一次把目标速度顶太高。
- if 'target_forward_velocity' in raw_stage:
- stage_config['task']['target_forward_velocity'] = float(raw_stage['target_forward_velocity'])
- if 'total_timesteps' in raw_stage:
- stage_config['training']['total_timesteps'] = int(raw_stage['total_timesteps'])
- if 'initial_log_std' in raw_stage:
- stage_config['training']['initial_log_std'] = float(raw_stage['initial_log_std'])
- stage_config['experiment']['name'] = f"{config['experiment']['name']}_{stage_name}"
- stage_configs.append((stage_name, stage_config))
- return stage_configs
- def main() -> int:
- args = parse_args()
- try:
- from stable_baselines3 import PPO
- from stable_baselines3.common.callbacks import CheckpointCallback
- from stable_baselines3.common.monitor import Monitor
- except ImportError:
- print(
- '缺少 stable-baselines3,请先进入 guguji_rl 目录安装依赖: '
- 'pip install -r requirements.txt',
- file=sys.stderr,
- )
- return 1
- from guguji_rl.envs import GazeboBipedEnv
- config = load_config(resolve_input_path(args.config))
- if args.device is not None:
- config['training']['device'] = args.device
- if args.total_timesteps is not None:
- config['training']['total_timesteps'] = args.total_timesteps
- if args.init_model is not None:
- config['training']['init_model_path'] = str(resolve_input_path(args.init_model))
- # 这里统一解析训练设备,方便你只改 YAML 就切换 CPU / GPU。
- config['training']['device'] = resolve_device(config['training']['device'])
- output_root = resolve_project_path(config, config['training']['output_root'])
- timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
- run_dir = output_root / f"{config['experiment']['name']}_{timestamp}"
- run_dir.mkdir(parents=True, exist_ok=True)
- # 保存一份展开后的配置,便于后面复现实验。
- save_yaml(config, run_dir / 'resolved_config.yaml')
- stage_configs = build_curriculum_stage_configs(config)
- print(f"训练设备: {config['training']['device']}")
- print(f"输出目录: {run_dir}")
- model = None
- final_model_path = run_dir / 'final_model'
- final_stage_config = config
- for stage_index, (stage_name, stage_config) in enumerate(stage_configs, start=1):
- stage_dir = run_dir if stage_name is None else run_dir / stage_name
- stage_dir.mkdir(parents=True, exist_ok=True)
- # 每个阶段都单独保存一份实际生效的配置,后面你回看实验会很方便。
- save_yaml(stage_config, stage_dir / 'resolved_config.yaml')
- if stage_name is not None:
- print(
- f'开始课程阶段 {stage_index}/{len(stage_configs)}: {stage_name} '
- f'(target_forward_velocity={stage_config["task"]["target_forward_velocity"]:.2f}, '
- f'timesteps={int(stage_config["training"]["total_timesteps"])})'
- )
- env = Monitor(GazeboBipedEnv(stage_config))
- checkpoint_callback = CheckpointCallback(
- save_freq=max(int(stage_config['training']['checkpoint_freq']), 1),
- save_path=str(stage_dir / 'checkpoints'),
- name_prefix='guguji_ppo',
- )
- try:
- if model is None:
- policy_kwargs = {
- 'net_arch': list(stage_config['training']['policy_net_arch']),
- }
- # 先用 MLP + PPO 跑通训练闭环,后面你可以再逐步增大网络规模。
- model = PPO(
- policy='MlpPolicy',
- env=env,
- verbose=1,
- seed=int(stage_config['training']['seed']),
- learning_rate=float(stage_config['training']['learning_rate']),
- n_steps=int(stage_config['training']['n_steps']),
- batch_size=int(stage_config['training']['batch_size']),
- gamma=float(stage_config['training']['gamma']),
- gae_lambda=float(stage_config['training']['gae_lambda']),
- clip_range=float(stage_config['training']['clip_range']),
- ent_coef=float(stage_config['training']['ent_coef']),
- vf_coef=float(stage_config['training']['vf_coef']),
- device=stage_config['training']['device'],
- tensorboard_log=str(run_dir / 'tensorboard'),
- policy_kwargs=policy_kwargs,
- )
- init_model_path = stage_config['training'].get('init_model_path')
- if init_model_path:
- resolved_init_model_path = resolve_input_path(str(init_model_path))
- # 这里不是直接 load 整个 PPO 对象,而是把旧模型参数灌入新模型。
- # 好处是:我们仍然使用当前配置文件里的超参数,只复用之前学到的策略权重。
- model.set_parameters(
- str(resolved_init_model_path),
- exact_match=False,
- device=stage_config['training']['device'],
- )
- print(f"已加载课程初始化模型: {resolved_init_model_path}")
- else:
- model.set_env(env)
- maybe_override_policy_log_std(model, stage_config['training'].get('initial_log_std'))
- model.learn(
- total_timesteps=int(stage_config['training']['total_timesteps']),
- callback=checkpoint_callback,
- progress_bar=True,
- reset_num_timesteps=(stage_index == 1),
- )
- stage_model_path = stage_dir / 'final_model'
- model.save(stage_model_path)
- final_model_path = stage_model_path
- final_stage_config = stage_config
- if stage_name is not None:
- print(f'课程阶段完成,模型已保存到: {stage_model_path.with_suffix(".zip")}')
- finally:
- env.close()
- if final_model_path != run_dir / 'final_model' and model is not None:
- # 在课程学习模式下,额外在 run 根目录保存一份最终模型,方便统一引用。
- model.save(run_dir / 'final_model')
- final_model_path = run_dir / 'final_model'
- print(f'训练完成,模型已保存到: {run_dir / "final_model.zip"}')
- evaluation_config = final_stage_config['evaluation']
- if bool(evaluation_config.get('auto_forward_progress', True)) and not args.skip_auto_eval:
- try:
- # 每轮训练结束后自动做一次前进评估,方便你快速看 delta_x / mean_vx。
- summary = evaluate_forward_progress(
- config=final_stage_config,
- model_path=final_model_path,
- episodes=int(evaluation_config['forward_progress_episodes']),
- max_steps=int(evaluation_config['forward_progress_max_steps']),
- deterministic=bool(evaluation_config['forward_progress_deterministic']),
- )
- print_forward_progress_summary(summary)
- except Exception as error:
- print(f'自动前进评估失败: {error}', file=sys.stderr)
- return 0
- if __name__ == '__main__':
- raise SystemExit(main())
|