Couriers

Message routing and data transport

Couriers

Reference for OctoMY™'s courier system that handles message routing, data serialization, and application-layer communication.

Did You Know?

Couriers use different reliability modes for different data types. Sensor data uses unreliable (UDP-style) delivery because stale readings are useless - you want the latest data, not retransmitted old data. But Plan transfers use reliable delivery because you need every byte to arrive correctly. This hybrid approach gives OctoMY™ the best of both worlds.


Courier overview

Couriers are the application-layer message handlers in OctoMY™:

Courier Overview


Courier architecture

Base class

class Courier : public QObject {
    Q_OBJECT

public:
    explicit Courier(QObject* parent = nullptr);
    virtual ~Courier();

    // Identification
    virtual QString name() const = 0;
    virtual quint8 id() const = 0;

    // Message handling
    virtual void onReceive(const QByteArray& data) = 0;
    virtual QByteArray serialize() const = 0;

    // Lifecycle
    virtual void activate();
    virtual void deactivate();

    // Reliability
    bool isReliable() const;
    void setReliable(bool reliable);

    // Timing
    quint32 sendInterval() const;
    void setSendInterval(quint32 ms);

signals:
    void dataReady();
    void error(const QString& message);

protected:
    void scheduleSend();
    CommsSession* session() const;
};

Courier mandate

struct CourierMandate {
    quint8 courierId;
    bool reliable;
    quint32 sendIntervalMs;
    quint32 priority;
    bool active;
};

Built-in couriers

System couriers (0-15)

ID Name Purpose
0 SystemCourier Handshake, keepalive, errors
1 DiscoveryCourier Peer discovery

Application couriers (16-127)

ID Name Purpose Reliability
16 AgentStateCourier State sync Unreliable
17 SensorsCourier Sensor data Unreliable
18 ActuatorsCourier Motor commands Unreliable
19 BlobCourier Large transfers Reliable
20 PlanCourier Plan transfer Reliable
21 LogCourier Remote logging Reliable

AgentStateCourier

Synchronizes node state between peers.

Message format

struct AgentStateMessage {
    quint64 timestamp;
    quint8 flags;
    quint8 mode;           // AgentMode enum
    quint8 batteryLevel;   // 0-100%
    qint8 signalStrength;  // dBm
    quint16 activePlans;   // Bitmask
    quint16 errorFlags;    // Error bitmask
};

Usage

AgentStateCourier* courier = new AgentStateCourier(session);

// Subscribe to state updates
connect(courier, &AgentStateCourier::stateReceived,
        [](const AgentStateMessage& state) {
    qDebug() << "Remote mode:" << state.mode;
    qDebug() << "Battery:" << state.batteryLevel << "%";
});

// Send state update
courier->sendState(currentState);

Flags

Bit Flag Description
0 CONNECTED Hardware connected
1 MOTORS_ENABLED Motors active
2 SENSORS_READY Sensors calibrated
3 LOW_BATTERY Battery warning
4 ERROR Error condition
5 CHARGING Battery charging

SensorsCourier

Streams sensor data from Agent to Remote.

Message format

struct SensorsMessage {
    quint64 timestamp;
    quint8 sensorCount;
    SensorReading readings[];  // Variable length
};

struct SensorReading {
    quint8 sensorId;
    quint8 type;        // SensorType enum
    quint16 value;      // Scaled value
    quint8 quality;     // 0-100 signal quality
};

Sensor types

ID Type Unit Scale
0 DISTANCE cm 1:1
1 ANGLE degrees 10:1
2 TEMPERATURE Celsius 10:1
3 VOLTAGE mV 1:1
4 CURRENT mA 1:1
5 LIGHT lux 1:1
6 ACCELERATION mg 1:1
7 ANGULAR_RATE mdps 1:1
8 BOOLEAN true/false 1:1

Usage

SensorsCourier* courier = new SensorsCourier(session);
courier->setSendInterval(50);  // 20 Hz

// Receive sensor data
connect(courier, &SensorsCourier::sensorsReceived,
        [](const SensorsMessage& msg) {
    for (int i = 0; i < msg.sensorCount; i++) {
        auto& reading = msg.readings[i];
        processSensor(reading.sensorId, reading.value);
    }
});

// Send sensor data (on Agent)
SensorsMessage msg;
msg.timestamp = currentTimeMs();
msg.sensorCount = activeSensors.count();
// ... populate readings
courier->send(msg);

ActuatorsCourier

Sends actuator commands from Remote to Agent.

Message format

struct ActuatorsMessage {
    quint64 timestamp;
    quint8 actuatorCount;
    ActuatorCommand commands[];
};

struct ActuatorCommand {
    quint8 actuatorId;
    quint8 type;        // ActuatorType enum
    qint16 value;       // Target value
    quint8 flags;       // Control flags
};

Actuator types

ID Type Value Range
0 DC_MOTOR -100 to 100
1 SERVO 0 to 180
2 CONTINUOUS_SERVO -100 to 100
3 STEPPER steps/sec
4 RELAY 0 or 1
5 PWM 0 to 255

Usage

ActuatorsCourier* courier = new ActuatorsCourier(session);

// Send motor commands
ActuatorsMessage msg;
msg.timestamp = currentTimeMs();
msg.actuatorCount = 2;
msg.commands[0] = {0, DC_MOTOR, 60, 0};   // Left motor 60%
msg.commands[1] = {1, DC_MOTOR, 60, 0};   // Right motor 60%
courier->send(msg);

// Receive commands (on Agent)
connect(courier, &ActuatorsCourier::commandReceived,
        [](const ActuatorsMessage& msg) {
    for (int i = 0; i < msg.actuatorCount; i++) {
        applyActuator(msg.commands[i]);
    }
});

BlobCourier

Handles large data transfers with chunking and reliability.

Blob types

Type Description Max Size
PLAN OPAL plan file 1 MB
CONFIG Configuration 64 KB
FIRMWARE Firmware update 16 MB
LOG Log data 1 MB
MEDIA Images/audio 10 MB

Transfer protocol

Blob Transfer Protocol

Usage

BlobCourier* courier = new BlobCourier(session);

// Send blob
QByteArray planData = loadPlan("patrol.opal");
BlobFuture* future = courier->sendBlob(PLAN, planData);

connect(future, &BlobFuture::progress, [](float percent) {
    updateProgress(percent);
});

connect(future, &BlobFuture::complete, []() {
    qDebug() << "Plan sent successfully";
});

connect(future, &BlobFuture::failed, [](const QString& error) {
    qDebug() << "Transfer failed:" << error;
});

// Receive blob
connect(courier, &BlobCourier::blobReceived,
        [](BlobType type, const QByteArray& data) {
    if (type == PLAN) {
        installPlan(data);
    }
});

DiscoveryCourier

Handles peer discovery on local network.

Discovery protocol

Discovery Messages

Usage

DiscoveryCourier* courier = new DiscoveryCourier(channel);

// Start discovery
courier->startDiscovery();

// Handle discovered nodes
connect(courier, &DiscoveryCourier::nodeDiscovered,
        [](const NodeInfo& info) {
    qDebug() << "Found:" << info.name;
    qDebug() << "Type:" << info.nodeType;
    qDebug() << "Address:" << info.address;
});

// Announce ourselves
courier->announce();

CourierSet

Manages a collection of couriers for a node type.

Node-specific courier sets

// Agent side
class AgentCourierSet : public CourierSet {
public:
    SensorsCourier* sensors;      // Sends sensor data
    ActuatorsCourier* actuators;  // Receives commands
    AgentStateCourier* state;     // Sends state
    BlobCourier* blob;            // Receive plans
};

// Remote side
class RemoteCourierSet : public CourierSet {
public:
    SensorsCourier* sensors;      // Receives sensor data
    ActuatorsCourier* actuators;  // Sends commands
    AgentStateCourier* state;     // Receives state
    BlobCourier* blob;            // Send plans
};

Usage

// Create courier set for session
AgentCourierSet* couriers = new AgentCourierSet(session);

// Activate all couriers
couriers->activate();

// Access individual couriers
couriers->sensors->setSendInterval(20);
couriers->state->setSendInterval(100);

// Deactivate when done
couriers->deactivate();

Custom couriers

Creating a custom courier

class MyCourier : public Courier {
    Q_OBJECT

public:
    explicit MyCourier(QObject* parent = nullptr)
        : Courier(parent) {}

    QString name() const override {
        return "MyCourier";
    }

    quint8 id() const override {
        return 128;  // Custom courier ID range
    }

    void onReceive(const QByteArray& data) override {
        // Parse incoming data
        MyMessage msg;
        QDataStream stream(data);
        stream >> msg.field1 >> msg.field2;

        emit messageReceived(msg);
    }

    QByteArray serialize() const override {
        // Serialize outgoing data
        QByteArray data;
        QDataStream stream(&data, QIODevice::WriteOnly);
        stream << mCurrentMessage.field1
               << mCurrentMessage.field2;
        return data;
    }

    void sendMessage(const MyMessage& msg) {
        mCurrentMessage = msg;
        scheduleSend();
    }

signals:
    void messageReceived(const MyMessage& msg);

private:
    MyMessage mCurrentMessage;
};

Registering custom courier

// Register with session
session->registerCourier(myCourier);

// Or add to courier set
class MyCourierSet : public CourierSet {
public:
    MyCourierSet(CommsSession* session)
        : CourierSet(session)
    {
        myCourier = new MyCourier(this);
        addCourier(myCourier);
    }

    MyCourier* myCourier;
};

Courier timing

Send scheduling

Courier Send Timeline

Priority and batching

// Couriers are sent in priority order
courier->setPriority(10);  // Higher = first

// Multiple couriers batched in one packet when possible
session->setBatchingEnabled(true);
session->setMaxBatchDelay(5);  // Max wait for batching

Error handling

Courier errors

Error Cause Recovery
SerializationError Invalid data Log and skip
DeserializationError Corrupted packet Request resend
TimeoutError No response Retry or fail
SessionError Connection lost Reconnect

Error handling

connect(courier, &Courier::error,
        [](const QString& error) {
    qWarning() << "Courier error:" << error;
    // Handle error appropriately
});

// In custom courier
try {
    processData(data);
} catch (const std::exception& e) {
    emit error(QString("Processing failed: %1").arg(e.what()));
}

Debugging couriers

Courier statistics

struct CourierStats {
    quint64 messagesSent;
    quint64 messagesReceived;
    quint64 bytesSent;
    quint64 bytesReceived;
    quint32 errorCount;
    quint32 averageLatencyMs;
};

CourierStats stats = courier->statistics();
qDebug() << "Messages sent:" << stats.messagesSent;
qDebug() << "Average latency:" << stats.averageLatencyMs << "ms";

Debug logging

// Enable courier debug logging
QLoggingCategory::setFilterRules("octomy.courier.debug=true");

// Or per-courier
courier->setDebugEnabled(true);

In this section
Topics
reference couriers messaging serialization networking
See also