Couriers
Message routing and data transport
Couriers
Reference for OctoMY™'s courier system that handles message routing, data serialization, and application-layer communication.
Did You Know?
Couriers use different reliability modes for different data types. Sensor data uses unreliable (UDP-style) delivery because stale readings are useless - you want the latest data, not retransmitted old data. But Plan transfers use reliable delivery because you need every byte to arrive correctly. This hybrid approach gives OctoMY™ the best of both worlds.
Courier overview
Couriers are the application-layer message handlers in OctoMY™:
Courier architecture
Base class
class Courier : public QObject {
Q_OBJECT
public:
explicit Courier(QObject* parent = nullptr);
virtual ~Courier();
// Identification
virtual QString name() const = 0;
virtual quint8 id() const = 0;
// Message handling
virtual void onReceive(const QByteArray& data) = 0;
virtual QByteArray serialize() const = 0;
// Lifecycle
virtual void activate();
virtual void deactivate();
// Reliability
bool isReliable() const;
void setReliable(bool reliable);
// Timing
quint32 sendInterval() const;
void setSendInterval(quint32 ms);
signals:
void dataReady();
void error(const QString& message);
protected:
void scheduleSend();
CommsSession* session() const;
};
Courier mandate
struct CourierMandate {
quint8 courierId;
bool reliable;
quint32 sendIntervalMs;
quint32 priority;
bool active;
};
Built-in couriers
System couriers (0-15)
| ID | Name | Purpose |
|---|---|---|
| 0 | SystemCourier | Handshake, keepalive, errors |
| 1 | DiscoveryCourier | Peer discovery |
Application couriers (16-127)
| ID | Name | Purpose | Reliability |
|---|---|---|---|
| 16 | AgentStateCourier | State sync | Unreliable |
| 17 | SensorsCourier | Sensor data | Unreliable |
| 18 | ActuatorsCourier | Motor commands | Unreliable |
| 19 | BlobCourier | Large transfers | Reliable |
| 20 | PlanCourier | Plan transfer | Reliable |
| 21 | LogCourier | Remote logging | Reliable |
AgentStateCourier
Synchronizes node state between peers.
Message format
struct AgentStateMessage {
quint64 timestamp;
quint8 flags;
quint8 mode; // AgentMode enum
quint8 batteryLevel; // 0-100%
qint8 signalStrength; // dBm
quint16 activePlans; // Bitmask
quint16 errorFlags; // Error bitmask
};
Usage
AgentStateCourier* courier = new AgentStateCourier(session);
// Subscribe to state updates
connect(courier, &AgentStateCourier::stateReceived,
[](const AgentStateMessage& state) {
qDebug() << "Remote mode:" << state.mode;
qDebug() << "Battery:" << state.batteryLevel << "%";
});
// Send state update
courier->sendState(currentState);
Flags
| Bit | Flag | Description |
|---|---|---|
| 0 | CONNECTED | Hardware connected |
| 1 | MOTORS_ENABLED | Motors active |
| 2 | SENSORS_READY | Sensors calibrated |
| 3 | LOW_BATTERY | Battery warning |
| 4 | ERROR | Error condition |
| 5 | CHARGING | Battery charging |
SensorsCourier
Streams sensor data from Agent to Remote.
Message format
struct SensorsMessage {
quint64 timestamp;
quint8 sensorCount;
SensorReading readings[]; // Variable length
};
struct SensorReading {
quint8 sensorId;
quint8 type; // SensorType enum
quint16 value; // Scaled value
quint8 quality; // 0-100 signal quality
};
Sensor types
| ID | Type | Unit | Scale |
|---|---|---|---|
| 0 | DISTANCE | cm | 1:1 |
| 1 | ANGLE | degrees | 10:1 |
| 2 | TEMPERATURE | Celsius | 10:1 |
| 3 | VOLTAGE | mV | 1:1 |
| 4 | CURRENT | mA | 1:1 |
| 5 | LIGHT | lux | 1:1 |
| 6 | ACCELERATION | mg | 1:1 |
| 7 | ANGULAR_RATE | mdps | 1:1 |
| 8 | BOOLEAN | true/false | 1:1 |
Usage
SensorsCourier* courier = new SensorsCourier(session);
courier->setSendInterval(50); // 20 Hz
// Receive sensor data
connect(courier, &SensorsCourier::sensorsReceived,
[](const SensorsMessage& msg) {
for (int i = 0; i < msg.sensorCount; i++) {
auto& reading = msg.readings[i];
processSensor(reading.sensorId, reading.value);
}
});
// Send sensor data (on Agent)
SensorsMessage msg;
msg.timestamp = currentTimeMs();
msg.sensorCount = activeSensors.count();
// ... populate readings
courier->send(msg);
ActuatorsCourier
Sends actuator commands from Remote to Agent.
Message format
struct ActuatorsMessage {
quint64 timestamp;
quint8 actuatorCount;
ActuatorCommand commands[];
};
struct ActuatorCommand {
quint8 actuatorId;
quint8 type; // ActuatorType enum
qint16 value; // Target value
quint8 flags; // Control flags
};
Actuator types
| ID | Type | Value Range |
|---|---|---|
| 0 | DC_MOTOR | -100 to 100 |
| 1 | SERVO | 0 to 180 |
| 2 | CONTINUOUS_SERVO | -100 to 100 |
| 3 | STEPPER | steps/sec |
| 4 | RELAY | 0 or 1 |
| 5 | PWM | 0 to 255 |
Usage
ActuatorsCourier* courier = new ActuatorsCourier(session);
// Send motor commands
ActuatorsMessage msg;
msg.timestamp = currentTimeMs();
msg.actuatorCount = 2;
msg.commands[0] = {0, DC_MOTOR, 60, 0}; // Left motor 60%
msg.commands[1] = {1, DC_MOTOR, 60, 0}; // Right motor 60%
courier->send(msg);
// Receive commands (on Agent)
connect(courier, &ActuatorsCourier::commandReceived,
[](const ActuatorsMessage& msg) {
for (int i = 0; i < msg.actuatorCount; i++) {
applyActuator(msg.commands[i]);
}
});
BlobCourier
Handles large data transfers with chunking and reliability.
Blob types
| Type | Description | Max Size |
|---|---|---|
| PLAN | OPAL plan file | 1 MB |
| CONFIG | Configuration | 64 KB |
| FIRMWARE | Firmware update | 16 MB |
| LOG | Log data | 1 MB |
| MEDIA | Images/audio | 10 MB |
Transfer protocol
Usage
BlobCourier* courier = new BlobCourier(session);
// Send blob
QByteArray planData = loadPlan("patrol.opal");
BlobFuture* future = courier->sendBlob(PLAN, planData);
connect(future, &BlobFuture::progress, [](float percent) {
updateProgress(percent);
});
connect(future, &BlobFuture::complete, []() {
qDebug() << "Plan sent successfully";
});
connect(future, &BlobFuture::failed, [](const QString& error) {
qDebug() << "Transfer failed:" << error;
});
// Receive blob
connect(courier, &BlobCourier::blobReceived,
[](BlobType type, const QByteArray& data) {
if (type == PLAN) {
installPlan(data);
}
});
DiscoveryCourier
Handles peer discovery on local network.
Discovery protocol
Usage
DiscoveryCourier* courier = new DiscoveryCourier(channel);
// Start discovery
courier->startDiscovery();
// Handle discovered nodes
connect(courier, &DiscoveryCourier::nodeDiscovered,
[](const NodeInfo& info) {
qDebug() << "Found:" << info.name;
qDebug() << "Type:" << info.nodeType;
qDebug() << "Address:" << info.address;
});
// Announce ourselves
courier->announce();
CourierSet
Manages a collection of couriers for a node type.
Node-specific courier sets
// Agent side
class AgentCourierSet : public CourierSet {
public:
SensorsCourier* sensors; // Sends sensor data
ActuatorsCourier* actuators; // Receives commands
AgentStateCourier* state; // Sends state
BlobCourier* blob; // Receive plans
};
// Remote side
class RemoteCourierSet : public CourierSet {
public:
SensorsCourier* sensors; // Receives sensor data
ActuatorsCourier* actuators; // Sends commands
AgentStateCourier* state; // Receives state
BlobCourier* blob; // Send plans
};
Usage
// Create courier set for session
AgentCourierSet* couriers = new AgentCourierSet(session);
// Activate all couriers
couriers->activate();
// Access individual couriers
couriers->sensors->setSendInterval(20);
couriers->state->setSendInterval(100);
// Deactivate when done
couriers->deactivate();
Custom couriers
Creating a custom courier
class MyCourier : public Courier {
Q_OBJECT
public:
explicit MyCourier(QObject* parent = nullptr)
: Courier(parent) {}
QString name() const override {
return "MyCourier";
}
quint8 id() const override {
return 128; // Custom courier ID range
}
void onReceive(const QByteArray& data) override {
// Parse incoming data
MyMessage msg;
QDataStream stream(data);
stream >> msg.field1 >> msg.field2;
emit messageReceived(msg);
}
QByteArray serialize() const override {
// Serialize outgoing data
QByteArray data;
QDataStream stream(&data, QIODevice::WriteOnly);
stream << mCurrentMessage.field1
<< mCurrentMessage.field2;
return data;
}
void sendMessage(const MyMessage& msg) {
mCurrentMessage = msg;
scheduleSend();
}
signals:
void messageReceived(const MyMessage& msg);
private:
MyMessage mCurrentMessage;
};
Registering custom courier
// Register with session
session->registerCourier(myCourier);
// Or add to courier set
class MyCourierSet : public CourierSet {
public:
MyCourierSet(CommsSession* session)
: CourierSet(session)
{
myCourier = new MyCourier(this);
addCourier(myCourier);
}
MyCourier* myCourier;
};
Courier timing
Send scheduling
Priority and batching
// Couriers are sent in priority order
courier->setPriority(10); // Higher = first
// Multiple couriers batched in one packet when possible
session->setBatchingEnabled(true);
session->setMaxBatchDelay(5); // Max wait for batching
Error handling
Courier errors
| Error | Cause | Recovery |
|---|---|---|
SerializationError |
Invalid data | Log and skip |
DeserializationError |
Corrupted packet | Request resend |
TimeoutError |
No response | Retry or fail |
SessionError |
Connection lost | Reconnect |
Error handling
connect(courier, &Courier::error,
[](const QString& error) {
qWarning() << "Courier error:" << error;
// Handle error appropriately
});
// In custom courier
try {
processData(data);
} catch (const std::exception& e) {
emit error(QString("Processing failed: %1").arg(e.what()));
}
Debugging couriers
Courier statistics
struct CourierStats {
quint64 messagesSent;
quint64 messagesReceived;
quint64 bytesSent;
quint64 bytesReceived;
quint32 errorCount;
quint32 averageLatencyMs;
};
CourierStats stats = courier->statistics();
qDebug() << "Messages sent:" << stats.messagesSent;
qDebug() << "Average latency:" << stats.averageLatencyMs << "ms";
Debug logging
// Enable courier debug logging
QLoggingCategory::setFilterRules("octomy.courier.debug=true");
// Or per-courier
courier->setDebugEnabled(true);