18 #ifndef DATA_FEED_RECEIVER_HPP
19 #define DATA_FEED_RECEIVER_HPP
24 #include "exceptions.hpp"
25 #include "limit_order_book/limit_order_book.hpp"
26 #include "messages.hpp"
35 template<
typename Handler>
39 asio::ip::udp::socket socket;
41 asio::ip::udp::endpoint sender_endpoint;
47 bool _is_session_active =
false;
55 input_buffer.emplace_back();
56 socket.async_receive_from(
59 [
this](
const std::error_code& error, std::size_t) {
61 throw Exception(
"DataFeed::read_message - " + error.message());
63 auto message = input_buffer.front();
64 input_buffer.pop_front();
66 if (reinterpret_cast<Messages::Header&>(message).sequence != ++last_sequence) {
67 std::cerr <<
"DataFeed::Receiver::read_message - out of sequence with server" << std::endl;
68 std::cerr <<
"DataFeed::Receiver::read_message - got sequence: " << reinterpret_cast<Messages::Header&>(message).sequence << std::endl;
69 std::cerr <<
"DataFeed::Receiver::read_message - expected sequence: " << last_sequence - 1 << std::endl;
70 last_sequence = reinterpret_cast<Messages::Header&>(message).sequence;
77 case Messages::MessageID::EndOfSession: { handle(
reinterpret_cast<Messages::EndOfSession&
>(message));
break; }
78 case Messages::MessageID::Clear: { handle(
reinterpret_cast<Messages::Clear&
>(message));
break; }
79 case Messages::MessageID::AddOrder: { handle(
reinterpret_cast<Messages::AddOrder&
>(message));
break; }
80 case Messages::MessageID::DeleteOrder: { handle(
reinterpret_cast<Messages::DeleteOrder&
>(message));
break; }
81 case Messages::MessageID::Trade: { handle(
reinterpret_cast<Messages::Trade&
>(message));
break; }
94 _is_session_active =
true;
96 handler.did_receive(
this, message);
103 inline void handle(
const Messages::EndOfSession& message) {
105 _is_session_active =
false;
107 handler.did_receive(
this, message);
114 inline void handle(
const Messages::Clear& message) {
118 handler.did_receive(
this, message);
125 inline void handle(
const Messages::AddOrder& message) {
127 book.limit(
side_to_LOB_side(message.side), message.uid, message.quantity, message.price);
129 handler.did_receive(
this, message);
136 inline void handle(
const Messages::DeleteOrder& message) {
137 if (not book.has(message.uid)) {
138 std::cerr <<
"DataFeed::Receiver::handle(DeleteOrder) - received delete for non-existent order: " << message.uid << std::endl;
143 book.cancel(message.uid);
145 handler.did_receive(
this, message);
152 inline void handle(
const Messages::Trade& message) {
153 if (not book.has(message.uid)) {
154 std::cerr <<
"DataFeed::Receiver::handle(Trade) - received trade for non-existent order: " << message.uid << std::endl;
160 book.reduce(message.uid, message.quantity);
162 handler.did_receive(
this, message);
169 inline void handle(
const Messages::Header& header) {
170 throw Exception(
"Receiver::handle(Header) - received message with invalid header " + header.to_string());
184 asio::io_context& io_context,
185 const asio::ip::address& listen,
186 const asio::ip::address& group,
189 bool reuse_enabled =
true
190 ) : socket(io_context), handler(handler_) {
192 asio::ip::udp::endpoint listen_endpoint(listen, port);
193 socket.open(listen_endpoint.protocol());
194 socket.set_option(asio::ip::udp::socket::reuse_address(reuse_enabled));
195 socket.bind(asio::ip::udp::endpoint(group, port));
196 socket.set_option(asio::ip::multicast::join_group(group));
202 auto size = std::numeric_limits<uint32_t>::max();
203 auto recv_buffer_size = asio::socket_base::receive_buffer_size(size);
204 socket.set_option(recv_buffer_size);
231 #endif // DATA_FEED_RECEIVER_HPP