Skip to content

File mux.h

File List > jac > link > mux.h

Go to the documentation of this file.

#pragma once

#include "router.h"
#include "stream.h"
#include "linkTypes.h"
#include "encoders/encoderTypes.h"

#include <any>
#include <deque>
#include <functional>
#include <memory>
#include <istream>
#include <ostream>
#include <span>
#include <mutex>
#include <vector>


namespace jac {


enum class MuxError : int {
    INVALID_RECEIVE = 1,
    PACKETIZER = 2,
    PROCESSING = 3,
};


template<class Encoder>
class Mux : public DataLinkTx {
private:
    using Packetizer = typename Encoder::Packetizer;
    using Serializer = typename Encoder::Serializer;

    std::unique_ptr<DataLinkRx> _receiver;
    std::unique_ptr<Duplex> _stream;

    Packetizer _packetizer;

    std::mutex _writeMutex;

    std::function<void(MuxError, std::any)> _errorHandler;

    class MuxPacket : public Packet {
        using DataFrame = decltype(Serializer::buildDataFrame());

        std::reference_wrapper<Mux> _mux;
        uint8_t _channel;
        DataFrame _frame;
        bool sent = false;
    public:
        MuxPacket(Mux& mux, uint8_t channel) : _mux(mux), _channel(channel), _frame(Serializer::buildDataFrame()) {}

        bool put(uint8_t c) override {
            if (sent) {
                throw std::runtime_error("Packet already sent");
            }

            return _frame.put(c);
        }

        size_t put(std::span<const uint8_t> data) override {
            if (sent) {
                throw std::runtime_error("Packet already sent");
            }

            return _frame.put(data);
        }

        size_t space() const override {
            return Serializer::capacity() - _frame.size();
        }

        bool send() override {
            if (sent) {
                throw std::runtime_error("Packet already sent");
            }
            auto data = _frame.finalize(_channel);

            std::lock_guard<std::mutex> lock(_mux.get()._writeMutex);
            return _mux.get()._stream->write(data) == data.size() && _mux.get()._stream->flush();
        }
    };

    void receive() {
        int c;
        while ((c = _stream->get()) != EOF) {
            auto putRes = _packetizer.put(c);
            if (putRes > 0) {
                DecodeResult result = _packetizer.decode();
                if (result.valid) {
                    if (_receiver) {
                        _receiver->processPacket(result.channel, result.data);
                    }
                }
                else {
                    // handle invalid packet
                    if (_errorHandler) {
                        _errorHandler(MuxError::INVALID_RECEIVE, std::tuple<int, uint8_t>{ putRes, result.channel });
                    }
                }
            }
            else if (putRes < 0) {
                // handle protocol error
                if (_errorHandler) {
                    _errorHandler(MuxError::PACKETIZER, int(putRes));
                }
            }
        }
    }
public:
    Mux(std::unique_ptr<Duplex> stream) : _stream(std::move(stream)) {
        _stream->onData([this]() {
            try {
                receive();
            }
            catch (std::exception& e) {
                if (this->_errorHandler) {
                    this->_errorHandler(MuxError::PROCESSING, std::string("Exception: ") + e.what());
                }
            }
            catch (...) {
                if (this->_errorHandler) {
                    this->_errorHandler(MuxError::PROCESSING, std::string("Unknown exception"));
                }
            }
        });
    }
    Mux(const Mux&) = delete;
    Mux(Mux&&) = delete;

    void bindRx(std::unique_ptr<DataLinkRx> receiver) {
        _receiver = std::move(receiver);
    }

    std::unique_ptr<Packet> buildPacket(uint8_t channel) override {
        return std::make_unique<MuxPacket>(*this, channel);
    }

    size_t maxPacketSize() const override {
        return Serializer::capacity();
    }

    void setErrorHandler(std::function<void(MuxError, std::any)> handler) {
        _errorHandler = handler;
    }
};


} // namespace jac