脚本用途
脚本路径:scripts/export_rosbag_images.py
脚本本体没有写进文档,确实不完整。前一版文档主要写了“怎么用”,没有把“脚本具体做了什么、怎么做的、限制是什么”写进去。这里补充脚本实现说明,便于后续维护和修改。
支持功能:
- 指定 ROS 2 bag 路径
- 指定导出 topic
- 指定图片输出目录
- 指定视频输出文件路径
- 支持
sensor_msgs/msg/CompressedImage - 支持
sensor_msgs/msg/Image - 可限制最大导出帧数,便于快速测试
脚本实现说明
脚本入口是 main(),整体执行流程如下:
- 解析命令行参数
- 检查是否至少指定了一种输出方式
- 解析 bag 路径
- 打开 rosbag reader
- 查找目标 topic 的消息类型
- 逐条读取消息并筛选指定 topic
- 将消息解码为 OpenCV 图像
- 按需写入图片序列和视频文件
- 输出导出统计信息
关键函数说明
parse_args()
负责解析命令行参数,当前支持以下输入:
bag--topic--output-dir--video-output--image-format--fps--start-index--max-frames
其中:
bag是位置参数--topic是必填参数--output-dir和--video-output至少要提供一个
resolve_bag_uri(bag_arg)
负责把输入的 bag 路径标准化成绝对路径。
支持两种输入:
- bag 目录
- 单个
.mcap文件
如果路径不存在,或者既不是目录也不是 .mcap 文件,会直接抛出异常。
open_reader(bag_uri)
负责创建 rosbag2_py.SequentialReader 并打开 bag。
处理逻辑:
- 如果输入本身是
.mcap文件,直接使用mcap - 如果输入是 bag 目录,则检查
metadata.yaml - 当
metadata.yaml中包含storage_identifier: mcap时,使用mcap - 否则默认使用
sqlite3
这样可以同时兼容常见的 ROS 2 bag 存储格式。
get_topic_type(reader, topic_name)
负责遍历 bag 中所有 topic,找到目标 topic 的消息类型。
例如你当前 bag 里的:
- topic:
/fays/atrak/cam2/compressed - type:
sensor_msgs/msg/CompressedImage
如果 topic 不存在,会报错:
Topic not found in bag: <topic名>
decode_image(message, topic_type)
负责把 ROS 消息转换成 OpenCV 图像。
当前支持两类消息:
1. sensor_msgs/msg/CompressedImage
处理方式:
- 从
message.data中取出压缩字节流 - 转成
numpy.uint8数组 - 调用
cv2.imdecode(..., cv2.IMREAD_COLOR)解码为 BGR 图像
这正对应你当前 bag 的消息类型。
2. sensor_msgs/msg/Image
当前支持这些编码:
bgr8rgb8mono8
处理方式:
- 根据
height、width和通道数重建图像数组 rgb8会转换为 OpenCV 使用的 BGRmono8会转换成 3 通道 BGR,便于统一写视频
如果遇到其它编码,例如 yuv、16UC1、32FC1 等,脚本会报错,不会自动处理。
build_video_writer(video_path, fps, frame_shape)
负责创建视频写入器。
脚本当前依次尝试这些编码器:
avc1mp4vXVID
处理策略:
- 如果输出后缀是
.mp4,优先尝试适合 mp4 的编码器 - 如果输出后缀是
.avi,优先尝试XVID - 如果按后缀匹配失败,则回退到所有候选编码器逐个尝试
函数返回:
writer- 实际成功使用的编码器名称
主流程细节
参数检查
在 main() 中,首先会检查:
At least one of --output-dir or --video-output must be set.
也就是说,脚本不会允许“既不导图片,也不导视频”的空操作。
reader 初始化
脚本会先打开 bag,再读取 topic 类型:
reader = open_reader(bag_uri)topic_type = get_topic_type(reader, args.topic)message_type = get_message(topic_type)
这里 get_message() 用于根据字符串类型名动态获取 ROS 消息类,后续才能正确反序列化。
消息遍历
主循环逻辑如下:
- 使用
reader.read_next()逐条读取 bag 消息 - 跳过不属于目标 topic 的消息
- 对目标消息执行
deserialize_message() - 调用
decode_image()解码成图像 - 根据参数决定是否写图片
- 根据参数决定是否写视频
这部分逻辑意味着:
- bag 中即使有多个 topic,也只会导出你指定的那一路
- 视频写入器只会在拿到第一帧后初始化,这样可以正确获得图像宽高
图片输出
图片命名规则为:
frame_000000.jpg
frame_000001.jpg
frame_000002.jpg
具体格式受这两个参数影响:
--start-index--image-format
视频输出
视频只会创建一次,在收到第一帧时初始化。原因是写视频前必须先知道图像尺寸:
heightwidth
初始化完成后,后续每一帧都会直接写入同一个输出文件。
结束条件
脚本在以下情况结束:
- bag 已全部读完
- 达到
--max-frames指定的上限
结束时会释放视频写入器,并输出:
Exported frames: <数量>
当前脚本的限制
当前脚本能满足“从 bag 提取图像并保存”的需求,但有以下限制:
1. 只支持两类消息
仅支持:
sensor_msgs/msg/CompressedImagesensor_msgs/msg/Image
2. Image 编码支持有限
只支持:
bgr8rgb8mono8
3. 视频编码参数不可精确控制
脚本当前依赖 OpenCV VideoWriter,因此通常不能精确控制:
- H.264 profile
- GOP 长度
- 是否禁用 B-frames
- 目标码率
如果后续要满足严格交付规范,应改成:
- 先导出图片序列
- 再调用
ffmpeg生成最终视频
4. 没有按消息时间戳重采样
当前视频帧率完全由 --fps 指定,而不是根据 bag 消息原始时间戳动态计算。
这意味着:
- 如果 bag 原始帧率不是固定值,导出视频的时间表现可能和采集时并不完全一致
- 当前脚本更适合做内容提取和快速检查,不是严格时间还原工具
后续可扩展方向
如果后面要继续增强脚本,建议优先考虑这些方向:
- 增加
ffmpeg编码模式,精确控制 H.264 参数 - 支持按原始消息时间戳生成视频
- 支持更多
sensor_msgs/msg/Image编码格式 - 支持导出时叠加时间戳或帧号
- 支持按时间区间截取 bag 内容
适用场景
适用于以下需求:
- 从 bag 中提取某一路相机图像
- 将图像保存为连续图片文件
- 将图像编码成视频文件
- 在不同 bag、不同 topic、不同输出路径之间灵活切换
基本命令
python3 scripts/export_rosbag_images.py <bag路径> --topic <topic名> [可选参数]
参数说明
bagROS 2 bag 路径,可以是 bag 目录,也可以直接是.mcap文件。--topic必填。要导出的图像 topic。--output-dir可选。导出图片序列的目录。--video-output可选。导出视频文件的路径,例如output.mp4。--image-format可选。图片格式,支持jpg或png,默认jpg。--fps可选。导出视频时使用的帧率,默认30。--start-index可选。图片起始编号,默认0。--max-frames可选。最多导出多少帧,常用于测试。
至少指定一种输出
以下两个参数至少要提供一个:
--output-dir--video-output
也就是说,可以:
- 只导出图片
- 只导出视频
- 同时导出图片和视频
针对当前 bag 的已知信息
你当前提供的 bag 信息如下:
- bag 路径:
/home/kvzjj/下载/rosbag2_2026_04_17-15_56_36 - topic:
/fays/atrak/cam2/compressed - 消息类型:
sensor_msgs/msg/CompressedImage - 存储格式:
mcap
实际使用示例
1. 同时导出图片和视频
python3 scripts/export_rosbag_images.py \
"/home/kvzjj/下载/rosbag2_2026_04_17-15_56_36" \
--topic /fays/atrak/cam2/compressed \
--output-dir "/home/kvzjj/下载/cam2_frames" \
--video-output "/home/kvzjj/下载/cam2.mp4" \
--fps 30
2. 只导出图片
python3 scripts/export_rosbag_images.py \
"/home/kvzjj/下载/rosbag2_2026_04_17-15_56_36" \
--topic /fays/atrak/cam2/compressed \
--output-dir "/home/kvzjj/下载/cam2_frames"
3. 只导出视频
python3 scripts/export_rosbag_images.py \
"/home/kvzjj/下载/rosbag2_2026_04_17-15_56_36" \
--topic /fays/atrak/cam2/compressed \
--video-output "/home/kvzjj/下载/cam2.mp4" \
--fps 30
4. 先测试前 5 帧
python3 scripts/export_rosbag_images.py \
"/home/kvzjj/下载/rosbag2_2026_04_17-15_56_36" \
--topic /fays/atrak/cam2/compressed \
--output-dir "/home/kvzjj/下载/rosbag_export_test_frames" \
--video-output "/home/kvzjj/下载/rosbag_export_test.mp4" \
--max-frames 5
输出结果说明
如果指定了图片目录,例如:
--output-dir "/home/kvzjj/下载/cam2_frames"
则输出图片文件名类似:
frame_000000.jpg
frame_000001.jpg
frame_000002.jpg
如果指定了视频输出,例如:
--video-output "/home/kvzjj/下载/cam2.mp4"
则会生成对应的视频文件。
编码说明
当前脚本在导出视频时,优先尝试以下编码器:
avc1mp4vXVID
其中:
avc1通常对应 H.264- 如果系统 OpenCV/FFmpeg 环境不支持 H.264,脚本会自动回退到其他可用编码器
注意事项
1. 如果 topic 不存在
脚本会报错:
Topic not found in bag: <topic名>
此时先执行:
ros2 bag info <bag路径>
确认 topic 名称是否正确。
2. 如果视频无法按规范精确编码
当前脚本通过 OpenCV 的 VideoWriter 生成视频,适合快速导出和检查内容。
如果你需要严格满足如下视频规范:
- H.264
- GOP 固定为 30
- 不允许 B-frames
- 指定位率范围,例如 4 到 9 Mbps
则建议流程改为:
- 先导出图片序列
- 再使用
ffmpeg编码成 mp4
因为 ffmpeg 可以更精确控制编码参数,而 OpenCV 通常不能完整控制这些细节。
以下是代码内容
#!/usr/bin/env python3
import argparse
import sys
from pathlib import Path
import cv2
import numpy as np
import rosbag2_py
from rclpy.serialization import deserialize_message
from rosidl_runtime_py.utilities import get_message
def parse_args():
parser = argparse.ArgumentParser(
description="Export image data from a ROS 2 bag topic to frames and/or video."
)
parser.add_argument(
"bag",
help="Path to rosbag directory or single .mcap file.",
)
parser.add_argument(
"--topic",
required=True,
help="Image topic to export, e.g. /fays/atrak/cam2/compressed.",
)
parser.add_argument(
"--output-dir",
default=None,
help="Directory for extracted image files.",
)
parser.add_argument(
"--video-output",
default=None,
help="Output video file path, e.g. /tmp/output.mp4.",
)
parser.add_argument(
"--image-format",
choices=("jpg", "png"),
default="jpg",
help="Extracted frame image format.",
)
parser.add_argument(
"--fps",
type=float,
default=30.0,
help="FPS used when encoding video output.",
)
parser.add_argument(
"--start-index",
type=int,
default=0,
help="Starting frame index for exported images.",
)
parser.add_argument(
"--max-frames",
type=int,
default=None,
help="Optional maximum number of frames to export.",
)
return parser.parse_args()
def resolve_bag_uri(bag_arg: str) -> str:
bag_path = Path(bag_arg).expanduser().resolve()
if bag_path.is_dir():
return str(bag_path)
if bag_path.is_file() and bag_path.suffix == ".mcap":
return str(bag_path)
raise FileNotFoundError(f"Bag path does not exist or is unsupported: {bag_path}")
def open_reader(bag_uri: str):
storage_id = "mcap" if bag_uri.endswith(".mcap") else "sqlite3"
if bag_uri.endswith(".mcap"):
storage_id = "mcap"
else:
metadata_path = Path(bag_uri) / "metadata.yaml"
if metadata_path.exists():
metadata_text = metadata_path.read_text(encoding="utf-8")
if "storage_identifier: mcap" in metadata_text:
storage_id = "mcap"
reader = rosbag2_py.SequentialReader()
reader.open(
rosbag2_py.StorageOptions(uri=bag_uri, storage_id=storage_id),
rosbag2_py.ConverterOptions(
input_serialization_format="cdr",
output_serialization_format="cdr",
),
)
return reader
def get_topic_type(reader, topic_name: str) -> str:
for topic_info in reader.get_all_topics_and_types():
if topic_info.name == topic_name:
return topic_info.type
raise ValueError(f"Topic not found in bag: {topic_name}")
def decode_image(message, topic_type: str):
if topic_type == "sensor_msgs/msg/CompressedImage":
array = np.frombuffer(message.data, dtype=np.uint8)
frame = cv2.imdecode(array, cv2.IMREAD_COLOR)
if frame is None:
raise ValueError("Failed to decode CompressedImage payload")
return frame
if topic_type == "sensor_msgs/msg/Image":
if message.encoding not in ("bgr8", "rgb8", "mono8"):
raise ValueError(f"Unsupported image encoding: {message.encoding}")
frame = np.frombuffer(message.data, dtype=np.uint8)
channels = 1 if message.encoding == "mono8" else 3
frame = frame.reshape((message.height, message.width, channels))
if message.encoding == "rgb8":
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
elif message.encoding == "mono8":
frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR)
return frame
raise ValueError(f"Unsupported topic type: {topic_type}")
def build_video_writer(video_path: Path, fps: float, frame_shape):
height, width = frame_shape[:2]
video_path.parent.mkdir(parents=True, exist_ok=True)
candidates = (
("avc1", ".mp4"),
("mp4v", ".mp4"),
("XVID", ".avi"),
)
suffix = video_path.suffix.lower()
for fourcc_name, expected_suffix in candidates:
if suffix and suffix != expected_suffix:
continue
writer = cv2.VideoWriter(
str(video_path),
cv2.VideoWriter_fourcc(*fourcc_name),
fps,
(width, height),
)
if writer.isOpened():
return writer, fourcc_name
writer.release()
for fourcc_name, _ in candidates:
writer = cv2.VideoWriter(
str(video_path),
cv2.VideoWriter_fourcc(*fourcc_name),
fps,
(width, height),
)
if writer.isOpened():
return writer, fourcc_name
writer.release()
raise RuntimeError(f"Failed to create video writer for: {video_path}")
def main():
args = parse_args()
if args.output_dir is None and args.video_output is None:
print("At least one of --output-dir or --video-output must be set.", file=sys.stderr)
return 2
bag_uri = resolve_bag_uri(args.bag)
reader = open_reader(bag_uri)
topic_type = get_topic_type(reader, args.topic)
message_type = get_message(topic_type)
output_dir = Path(args.output_dir).expanduser().resolve() if args.output_dir else None
video_path = Path(args.video_output).expanduser().resolve() if args.video_output else None
if output_dir is not None:
output_dir.mkdir(parents=True, exist_ok=True)
writer = None
writer_codec = None
frame_index = args.start_index
exported_count = 0
print(f"Bag: {bag_uri}")
print(f"Topic: {args.topic}")
print(f"Topic type: {topic_type}")
if output_dir is not None:
print(f"Image output dir: {output_dir}")
if video_path is not None:
print(f"Video output: {video_path}")
try:
while reader.has_next():
current_topic, data, _ = reader.read_next()
if current_topic != args.topic:
continue
msg = deserialize_message(data, message_type)
frame = decode_image(msg, topic_type)
if writer is None and video_path is not None:
writer, writer_codec = build_video_writer(video_path, args.fps, frame.shape)
print(f"Video codec: {writer_codec}")
if output_dir is not None:
image_path = output_dir / f"frame_{frame_index:06d}.{args.image_format}"
if not cv2.imwrite(str(image_path), frame):
raise RuntimeError(f"Failed to write image: {image_path}")
if writer is not None:
writer.write(frame)
frame_index += 1
exported_count += 1
if args.max_frames is not None and exported_count >= args.max_frames:
break
finally:
if writer is not None:
writer.release()
print(f"Exported frames: {exported_count}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
ROS Bag 图像导出说明
本文采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。
评论交流
欢迎留下你的想法