基于线程延迟处理的 ROS 消息逻辑优化

在 ROS 开发中,我们常常需要对某些消息的处理逻辑进行优化。例如,某个话题的消息可能会频繁发布,而我们希望在消息连续时推迟某些操作,只有在消息停止后一定时间才触发特定逻辑。这种场景可以通过线程延迟处理机制实现。

本文以一个电笛控制系统为例,讲解如何通过线程延迟机制优化 ROS 的消息处理逻辑。


需求场景

  • 话题 /electric_horn 的消息以 std_msgs::Bool 类型发布,表示是否按下电笛按钮。
  • 当消息连续时,不执行心跳操作 HeartBeat
  • 当最后一条消息超过 1 秒未更新时,触发 HeartBeat

实现思路

  1. 延迟触发机制
    通过记录最近一次收到消息的时间,启动一个独立的线程监控消息间隔。只有当消息停止(超过 1 秒未更新)后,才触发 HeartBeat 操作。

  2. 线程控制
    使用 std::thread 来实现延迟触发逻辑,同时通过 std::atomic 控制线程的启动和停止,避免多线程竞态。

  3. ROS消息回调逻辑
    每次收到新消息时,更新最新时间并终止之前的延迟线程,重新启动监控逻辑。


代码实现

类成员变量

TowerController 类中,定义以下变量:

1
2
3
4
5
6
7
8
9
10
class TowerController {
private:
std::chrono::steady_clock::time_point lastMessageTime_; // 最近消息的时间
std::shared_ptr<std::thread> delayThread_; // 延迟处理的线程
std::atomic<bool> stopDelayThread_; // 控制线程停止的标志

void PressElectricHornCallback(const std_msgs::Bool::ConstPtr& press);
int ElectricHorn();
void HeartBeat(bool active);
};

构造函数与析构函数

初始化时间点和线程控制变量,并确保析构时安全退出线程:

1
2
3
4
5
6
7
8
9
10
11
TowerController::TowerController() {
lastMessageTime_ = std::chrono::steady_clock::now();
stopDelayThread_ = false;
}

TowerController::~TowerController() {
stopDelayThread_ = true;
if (delayThread_ && delayThread_->joinable()) {
delayThread_->join();
}
}

消息回调函数

每次接收到消息时,更新最新时间,终止之前的延迟线程并启动新的监控线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void TowerController::PressElectricHornCallback(const std_msgs::Bool::ConstPtr& press) {
if (press->data) {
INFO("收到电笛按钮按下信号");
ctThread->Enqueue(std::bind(&TowerController::ElectricHorn, this)); // 异步处理电笛逻辑

// 更新最后一次收到消息的时间
lastMessageTime_ = std::chrono::steady_clock::now();

// 停止现有的延迟线程
stopDelayThread_ = true;
if (delayThread_ && delayThread_->joinable()) {
delayThread_->join();
}
stopDelayThread_ = false;

// 启动新的延迟线程
delayThread_ = std::make_shared<std::thread>([this]() {
while (!stopDelayThread_) {
auto now = std::chrono::steady_clock::now();
if (std::chrono::duration_cast<std::chrono::seconds>(now - lastMessageTime_).count() >= 1) {
INFO("消息间隔超过1秒,触发心跳信号");
HeartBeat(true); // 触发心跳操作
return;
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
});
}
}

电笛逻辑

保持 ElectricHorn 的核心逻辑不变,负责单纯的电笛控制:

1
2
3
4
5
6
int TowerController::ElectricHorn() {
INFO("电笛按钮被触发");
ElectricHornControl(); // 执行电笛硬件控制

return 0;
}

优化点与注意事项

  1. 避免线程泄漏
    每次启动新的延迟线程前,确保之前的线程安全退出(使用 join)。

  2. 提高线程效率
    使用 std::this_thread::sleep_for 来减少线程对 CPU 的占用,同时避免繁忙轮询。

  3. 多线程安全性
    使用 std::atomic 来控制线程停止标志,避免线程间的数据竞争。

  4. 可维护性
    HeartBeat 的触发逻辑外部化,确保核心功能(如 ElectricHorn)独立,逻辑清晰。


完整效果

  • 在消息频繁时,只执行电笛控制逻辑,不触发心跳信号。
  • 当最后一条消息超过 1 秒未更新后,延迟线程触发心跳信号。
  • 避免频繁触发心跳信号,提高系统效率。

总结

通过线程延迟机制,我们可以优雅地解决 ROS 消息的连续处理问题。这种模式不仅适用于电笛控制系统,还可以推广到其他需要延迟触发的场景,如报警信号、状态同步等。

使用延迟线程结合时间点记录,既保证了实时性,又提升了系统的灵活性,是一种实用且高效的设计模式。