脚本用途

脚本路径:scripts/export_rosbag_images.py

脚本本体没有写进文档,确实不完整。前一版文档主要写了“怎么用”,没有把“脚本具体做了什么、怎么做的、限制是什么”写进去。这里补充脚本实现说明,便于后续维护和修改。

支持功能:

  • 指定 ROS 2 bag 路径
  • 指定导出 topic
  • 指定图片输出目录
  • 指定视频输出文件路径
  • 支持 sensor_msgs/msg/CompressedImage
  • 支持 sensor_msgs/msg/Image
  • 可限制最大导出帧数,便于快速测试

脚本实现说明

脚本入口是 main(),整体执行流程如下:

  1. 解析命令行参数
  2. 检查是否至少指定了一种输出方式
  3. 解析 bag 路径
  4. 打开 rosbag reader
  5. 查找目标 topic 的消息类型
  6. 逐条读取消息并筛选指定 topic
  7. 将消息解码为 OpenCV 图像
  8. 按需写入图片序列和视频文件
  9. 输出导出统计信息

关键函数说明

parse_args()

负责解析命令行参数,当前支持以下输入:

  • bag
  • --topic
  • --output-dir
  • --video-output
  • --image-format
  • --fps
  • --start-index
  • --max-frames

其中:

  • bag 是位置参数
  • --topic 是必填参数
  • --output-dir--video-output 至少要提供一个

resolve_bag_uri(bag_arg)

负责把输入的 bag 路径标准化成绝对路径。

支持两种输入:

  • bag 目录
  • 单个 .mcap 文件

如果路径不存在,或者既不是目录也不是 .mcap 文件,会直接抛出异常。

open_reader(bag_uri)

负责创建 rosbag2_py.SequentialReader 并打开 bag。

处理逻辑:

  • 如果输入本身是 .mcap 文件,直接使用 mcap
  • 如果输入是 bag 目录,则检查 metadata.yaml
  • metadata.yaml 中包含 storage_identifier: mcap 时,使用 mcap
  • 否则默认使用 sqlite3

这样可以同时兼容常见的 ROS 2 bag 存储格式。

get_topic_type(reader, topic_name)

负责遍历 bag 中所有 topic,找到目标 topic 的消息类型。

例如你当前 bag 里的:

  • topic:/fays/atrak/cam2/compressed
  • type:sensor_msgs/msg/CompressedImage

如果 topic 不存在,会报错:

Topic not found in bag: <topic名>

decode_image(message, topic_type)

负责把 ROS 消息转换成 OpenCV 图像。

当前支持两类消息:

1. sensor_msgs/msg/CompressedImage

处理方式:

  • message.data 中取出压缩字节流
  • 转成 numpy.uint8 数组
  • 调用 cv2.imdecode(..., cv2.IMREAD_COLOR) 解码为 BGR 图像

这正对应你当前 bag 的消息类型。

2. sensor_msgs/msg/Image

当前支持这些编码:

  • bgr8
  • rgb8
  • mono8

处理方式:

  • 根据 heightwidth 和通道数重建图像数组
  • rgb8 会转换为 OpenCV 使用的 BGR
  • mono8 会转换成 3 通道 BGR,便于统一写视频

如果遇到其它编码,例如 yuv16UC132FC1 等,脚本会报错,不会自动处理。

build_video_writer(video_path, fps, frame_shape)

负责创建视频写入器。

脚本当前依次尝试这些编码器:

  • avc1
  • mp4v
  • XVID

处理策略:

  • 如果输出后缀是 .mp4,优先尝试适合 mp4 的编码器
  • 如果输出后缀是 .avi,优先尝试 XVID
  • 如果按后缀匹配失败,则回退到所有候选编码器逐个尝试

函数返回:

  • writer
  • 实际成功使用的编码器名称

主流程细节

参数检查

main() 中,首先会检查:

At least one of --output-dir or --video-output must be set.

也就是说,脚本不会允许“既不导图片,也不导视频”的空操作。

reader 初始化

脚本会先打开 bag,再读取 topic 类型:

  • reader = open_reader(bag_uri)
  • topic_type = get_topic_type(reader, args.topic)
  • message_type = get_message(topic_type)

这里 get_message() 用于根据字符串类型名动态获取 ROS 消息类,后续才能正确反序列化。

消息遍历

主循环逻辑如下:

  1. 使用 reader.read_next() 逐条读取 bag 消息
  2. 跳过不属于目标 topic 的消息
  3. 对目标消息执行 deserialize_message()
  4. 调用 decode_image() 解码成图像
  5. 根据参数决定是否写图片
  6. 根据参数决定是否写视频

这部分逻辑意味着:

  • bag 中即使有多个 topic,也只会导出你指定的那一路
  • 视频写入器只会在拿到第一帧后初始化,这样可以正确获得图像宽高

图片输出

图片命名规则为:

frame_000000.jpg
frame_000001.jpg
frame_000002.jpg

具体格式受这两个参数影响:

  • --start-index
  • --image-format

视频输出

视频只会创建一次,在收到第一帧时初始化。原因是写视频前必须先知道图像尺寸:

  • height
  • width

初始化完成后,后续每一帧都会直接写入同一个输出文件。

结束条件

脚本在以下情况结束:

  • bag 已全部读完
  • 达到 --max-frames 指定的上限

结束时会释放视频写入器,并输出:

Exported frames: <数量>

当前脚本的限制

当前脚本能满足“从 bag 提取图像并保存”的需求,但有以下限制:

1. 只支持两类消息

仅支持:

  • sensor_msgs/msg/CompressedImage
  • sensor_msgs/msg/Image

2. Image 编码支持有限

只支持:

  • bgr8
  • rgb8
  • mono8

3. 视频编码参数不可精确控制

脚本当前依赖 OpenCV VideoWriter,因此通常不能精确控制:

  • H.264 profile
  • GOP 长度
  • 是否禁用 B-frames
  • 目标码率

如果后续要满足严格交付规范,应改成:

  1. 先导出图片序列
  2. 再调用 ffmpeg 生成最终视频

4. 没有按消息时间戳重采样

当前视频帧率完全由 --fps 指定,而不是根据 bag 消息原始时间戳动态计算。

这意味着:

  • 如果 bag 原始帧率不是固定值,导出视频的时间表现可能和采集时并不完全一致
  • 当前脚本更适合做内容提取和快速检查,不是严格时间还原工具

后续可扩展方向

如果后面要继续增强脚本,建议优先考虑这些方向:

  1. 增加 ffmpeg 编码模式,精确控制 H.264 参数
  2. 支持按原始消息时间戳生成视频
  3. 支持更多 sensor_msgs/msg/Image 编码格式
  4. 支持导出时叠加时间戳或帧号
  5. 支持按时间区间截取 bag 内容

适用场景

适用于以下需求:

  • 从 bag 中提取某一路相机图像
  • 将图像保存为连续图片文件
  • 将图像编码成视频文件
  • 在不同 bag、不同 topic、不同输出路径之间灵活切换

基本命令

python3 scripts/export_rosbag_images.py <bag路径> --topic <topic名> [可选参数]

参数说明

  • bag ROS 2 bag 路径,可以是 bag 目录,也可以直接是 .mcap 文件。

  • --topic 必填。要导出的图像 topic。

  • --output-dir 可选。导出图片序列的目录。

  • --video-output 可选。导出视频文件的路径,例如 output.mp4

  • --image-format 可选。图片格式,支持 jpgpng,默认 jpg

  • --fps 可选。导出视频时使用的帧率,默认 30

  • --start-index 可选。图片起始编号,默认 0

  • --max-frames 可选。最多导出多少帧,常用于测试。

至少指定一种输出

以下两个参数至少要提供一个:

  • --output-dir
  • --video-output

也就是说,可以:

  • 只导出图片
  • 只导出视频
  • 同时导出图片和视频

针对当前 bag 的已知信息

你当前提供的 bag 信息如下:

  • bag 路径:/home/kvzjj/下载/rosbag2_2026_04_17-15_56_36
  • topic:/fays/atrak/cam2/compressed
  • 消息类型:sensor_msgs/msg/CompressedImage
  • 存储格式:mcap

实际使用示例

1. 同时导出图片和视频

python3 scripts/export_rosbag_images.py \
  "/home/kvzjj/下载/rosbag2_2026_04_17-15_56_36" \
  --topic /fays/atrak/cam2/compressed \
  --output-dir "/home/kvzjj/下载/cam2_frames" \
  --video-output "/home/kvzjj/下载/cam2.mp4" \
  --fps 30

2. 只导出图片

python3 scripts/export_rosbag_images.py \
  "/home/kvzjj/下载/rosbag2_2026_04_17-15_56_36" \
  --topic /fays/atrak/cam2/compressed \
  --output-dir "/home/kvzjj/下载/cam2_frames"

3. 只导出视频

python3 scripts/export_rosbag_images.py \
  "/home/kvzjj/下载/rosbag2_2026_04_17-15_56_36" \
  --topic /fays/atrak/cam2/compressed \
  --video-output "/home/kvzjj/下载/cam2.mp4" \
  --fps 30

4. 先测试前 5 帧

python3 scripts/export_rosbag_images.py \
  "/home/kvzjj/下载/rosbag2_2026_04_17-15_56_36" \
  --topic /fays/atrak/cam2/compressed \
  --output-dir "/home/kvzjj/下载/rosbag_export_test_frames" \
  --video-output "/home/kvzjj/下载/rosbag_export_test.mp4" \
  --max-frames 5

输出结果说明

如果指定了图片目录,例如:

--output-dir "/home/kvzjj/下载/cam2_frames"

则输出图片文件名类似:

frame_000000.jpg
frame_000001.jpg
frame_000002.jpg

如果指定了视频输出,例如:

--video-output "/home/kvzjj/下载/cam2.mp4"

则会生成对应的视频文件。

编码说明

当前脚本在导出视频时,优先尝试以下编码器:

  • avc1
  • mp4v
  • XVID

其中:

  • avc1 通常对应 H.264
  • 如果系统 OpenCV/FFmpeg 环境不支持 H.264,脚本会自动回退到其他可用编码器

注意事项

1. 如果 topic 不存在

脚本会报错:

Topic not found in bag: <topic名>

此时先执行:

ros2 bag info <bag路径>

确认 topic 名称是否正确。

2. 如果视频无法按规范精确编码

当前脚本通过 OpenCV 的 VideoWriter 生成视频,适合快速导出和检查内容。

如果你需要严格满足如下视频规范:

  • H.264
  • GOP 固定为 30
  • 不允许 B-frames
  • 指定位率范围,例如 4 到 9 Mbps

则建议流程改为:

  1. 先导出图片序列
  2. 再使用 ffmpeg 编码成 mp4

因为 ffmpeg 可以更精确控制编码参数,而 OpenCV 通常不能完整控制这些细节。

以下是代码内容

#!/usr/bin/env python3

import argparse
import sys
from pathlib import Path

import cv2
import numpy as np
import rosbag2_py
from rclpy.serialization import deserialize_message
from rosidl_runtime_py.utilities import get_message


def parse_args():
    parser = argparse.ArgumentParser(
        description="Export image data from a ROS 2 bag topic to frames and/or video."
    )
    parser.add_argument(
        "bag",
        help="Path to rosbag directory or single .mcap file.",
    )
    parser.add_argument(
        "--topic",
        required=True,
        help="Image topic to export, e.g. /fays/atrak/cam2/compressed.",
    )
    parser.add_argument(
        "--output-dir",
        default=None,
        help="Directory for extracted image files.",
    )
    parser.add_argument(
        "--video-output",
        default=None,
        help="Output video file path, e.g. /tmp/output.mp4.",
    )
    parser.add_argument(
        "--image-format",
        choices=("jpg", "png"),
        default="jpg",
        help="Extracted frame image format.",
    )
    parser.add_argument(
        "--fps",
        type=float,
        default=30.0,
        help="FPS used when encoding video output.",
    )
    parser.add_argument(
        "--start-index",
        type=int,
        default=0,
        help="Starting frame index for exported images.",
    )
    parser.add_argument(
        "--max-frames",
        type=int,
        default=None,
        help="Optional maximum number of frames to export.",
    )
    return parser.parse_args()


def resolve_bag_uri(bag_arg: str) -> str:
    bag_path = Path(bag_arg).expanduser().resolve()
    if bag_path.is_dir():
        return str(bag_path)
    if bag_path.is_file() and bag_path.suffix == ".mcap":
        return str(bag_path)
    raise FileNotFoundError(f"Bag path does not exist or is unsupported: {bag_path}")


def open_reader(bag_uri: str):
    storage_id = "mcap" if bag_uri.endswith(".mcap") else "sqlite3"
    if bag_uri.endswith(".mcap"):
        storage_id = "mcap"
    else:
        metadata_path = Path(bag_uri) / "metadata.yaml"
        if metadata_path.exists():
            metadata_text = metadata_path.read_text(encoding="utf-8")
            if "storage_identifier: mcap" in metadata_text:
                storage_id = "mcap"

    reader = rosbag2_py.SequentialReader()
    reader.open(
        rosbag2_py.StorageOptions(uri=bag_uri, storage_id=storage_id),
        rosbag2_py.ConverterOptions(
            input_serialization_format="cdr",
            output_serialization_format="cdr",
        ),
    )
    return reader


def get_topic_type(reader, topic_name: str) -> str:
    for topic_info in reader.get_all_topics_and_types():
        if topic_info.name == topic_name:
            return topic_info.type
    raise ValueError(f"Topic not found in bag: {topic_name}")


def decode_image(message, topic_type: str):
    if topic_type == "sensor_msgs/msg/CompressedImage":
        array = np.frombuffer(message.data, dtype=np.uint8)
        frame = cv2.imdecode(array, cv2.IMREAD_COLOR)
        if frame is None:
            raise ValueError("Failed to decode CompressedImage payload")
        return frame

    if topic_type == "sensor_msgs/msg/Image":
        if message.encoding not in ("bgr8", "rgb8", "mono8"):
            raise ValueError(f"Unsupported image encoding: {message.encoding}")

        frame = np.frombuffer(message.data, dtype=np.uint8)
        channels = 1 if message.encoding == "mono8" else 3
        frame = frame.reshape((message.height, message.width, channels))
        if message.encoding == "rgb8":
            frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
        elif message.encoding == "mono8":
            frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR)
        return frame

    raise ValueError(f"Unsupported topic type: {topic_type}")


def build_video_writer(video_path: Path, fps: float, frame_shape):
    height, width = frame_shape[:2]
    video_path.parent.mkdir(parents=True, exist_ok=True)

    candidates = (
        ("avc1", ".mp4"),
        ("mp4v", ".mp4"),
        ("XVID", ".avi"),
    )
    suffix = video_path.suffix.lower()

    for fourcc_name, expected_suffix in candidates:
        if suffix and suffix != expected_suffix:
            continue
        writer = cv2.VideoWriter(
            str(video_path),
            cv2.VideoWriter_fourcc(*fourcc_name),
            fps,
            (width, height),
        )
        if writer.isOpened():
            return writer, fourcc_name
        writer.release()

    for fourcc_name, _ in candidates:
        writer = cv2.VideoWriter(
            str(video_path),
            cv2.VideoWriter_fourcc(*fourcc_name),
            fps,
            (width, height),
        )
        if writer.isOpened():
            return writer, fourcc_name
        writer.release()

    raise RuntimeError(f"Failed to create video writer for: {video_path}")


def main():
    args = parse_args()
    if args.output_dir is None and args.video_output is None:
        print("At least one of --output-dir or --video-output must be set.", file=sys.stderr)
        return 2

    bag_uri = resolve_bag_uri(args.bag)
    reader = open_reader(bag_uri)
    topic_type = get_topic_type(reader, args.topic)
    message_type = get_message(topic_type)

    output_dir = Path(args.output_dir).expanduser().resolve() if args.output_dir else None
    video_path = Path(args.video_output).expanduser().resolve() if args.video_output else None
    if output_dir is not None:
        output_dir.mkdir(parents=True, exist_ok=True)

    writer = None
    writer_codec = None
    frame_index = args.start_index
    exported_count = 0

    print(f"Bag: {bag_uri}")
    print(f"Topic: {args.topic}")
    print(f"Topic type: {topic_type}")
    if output_dir is not None:
        print(f"Image output dir: {output_dir}")
    if video_path is not None:
        print(f"Video output: {video_path}")

    try:
        while reader.has_next():
            current_topic, data, _ = reader.read_next()
            if current_topic != args.topic:
                continue

            msg = deserialize_message(data, message_type)
            frame = decode_image(msg, topic_type)

            if writer is None and video_path is not None:
                writer, writer_codec = build_video_writer(video_path, args.fps, frame.shape)
                print(f"Video codec: {writer_codec}")

            if output_dir is not None:
                image_path = output_dir / f"frame_{frame_index:06d}.{args.image_format}"
                if not cv2.imwrite(str(image_path), frame):
                    raise RuntimeError(f"Failed to write image: {image_path}")

            if writer is not None:
                writer.write(frame)

            frame_index += 1
            exported_count += 1

            if args.max_frames is not None and exported_count >= args.max_frames:
                break
    finally:
        if writer is not None:
            writer.release()

    print(f"Exported frames: {exported_count}")
    return 0


if __name__ == "__main__":
    raise SystemExit(main())