CBOE Emulator  1.0
receiver.hpp
1 // A class for handling limit order book depth messages from multi-cast.
2 // Copyright 2020 Christian Kauten
3 //
4 // Author: Christian Kauten (kautenja@auburn.edu)
5 //
6 // This program is free software: you can redistribute it and/or modify
7 // it under the terms of the GNU General Public License as published by
8 // the Free Software Foundation, either version 3 of the License, or
9 // (at your option) any later version.
10 // This program is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 // You should have received a copy of the GNU General Public License
15 // along with this program. If not, see <http://www.gnu.org/licenses/>.
16 //
17 
18 #ifndef DATA_FEED_RECEIVER_HPP
19 #define DATA_FEED_RECEIVER_HPP
20 
21 #include <functional>
22 #include <iostream>
23 #include <asio.hpp>
24 #include "exceptions.hpp"
25 #include "limit_order_book/limit_order_book.hpp"
26 #include "messages.hpp"
27 
29 namespace DataFeed {
30 
35 template<typename Handler>
36 class Receiver {
37  private:
39  asio::ip::udp::socket socket;
41  asio::ip::udp::endpoint sender_endpoint;
43  Messages::PacketQueue input_buffer;
47  bool _is_session_active = false;
49  Handler& handler;
51  SequenceNumber last_sequence = 0;
52 
54  void read_message() {
55  input_buffer.emplace_back();
56  socket.async_receive_from(
57  asio::buffer(input_buffer.back(), sizeof(Messages::Packet)),
58  sender_endpoint,
59  [this](const std::error_code& error, std::size_t) {
60  if (error) // an error occurred while reading a message
61  throw Exception("DataFeed::read_message - " + error.message());
62  // copy the message from the front of the queue and pop it off
63  auto message = input_buffer.front();
64  input_buffer.pop_front();
65  // check if the sequence number is out of order
66  if (reinterpret_cast<Messages::Header&>(message).sequence != ++last_sequence) {
67  std::cerr << "DataFeed::Receiver::read_message - out of sequence with server" << std::endl;
68  std::cerr << "DataFeed::Receiver::read_message - got sequence: " << reinterpret_cast<Messages::Header&>(message).sequence << std::endl;
69  std::cerr << "DataFeed::Receiver::read_message - expected sequence: " << last_sequence - 1 << std::endl;
70  last_sequence = reinterpret_cast<Messages::Header&>(message).sequence;
71  }
72  // read another message from the server
73  read_message();
74  // unwrap and handle the data using the template ID
75  switch (reinterpret_cast<Messages::Header&>(message).uid) {
76  case Messages::MessageID::StartOfSession: { handle(reinterpret_cast<Messages::StartOfSession&>(message)); break; }
77  case Messages::MessageID::EndOfSession: { handle(reinterpret_cast<Messages::EndOfSession&>(message)); break; }
78  case Messages::MessageID::Clear: { handle(reinterpret_cast<Messages::Clear&>(message)); break; }
79  case Messages::MessageID::AddOrder: { handle(reinterpret_cast<Messages::AddOrder&>(message)); break; }
80  case Messages::MessageID::DeleteOrder: { handle(reinterpret_cast<Messages::DeleteOrder&>(message)); break; }
81  case Messages::MessageID::Trade: { handle(reinterpret_cast<Messages::Trade&>(message)); break; }
82  default: { handle(reinterpret_cast<Messages::Header&>(message)); break; }
83  }
84  }
85  );
86  }
87 
92  inline void handle(const Messages::StartOfSession& message) {
93  // set the trading session to active
94  _is_session_active = true;
95  // pass the message to the handler
96  handler.did_receive(this, message);
97  }
98 
103  inline void handle(const Messages::EndOfSession& message) {
104  // set the trading session to inactive
105  _is_session_active = false;
106  // pass the message to the handler
107  handler.did_receive(this, message);
108  }
109 
114  inline void handle(const Messages::Clear& message) {
115  // clear all the orders in the book
116  book.clear();
117  // pass the message to the handler
118  handler.did_receive(this, message);
119  }
120 
125  inline void handle(const Messages::AddOrder& message) {
126  // add the order to the book
127  book.limit(side_to_LOB_side(message.side), message.uid, message.quantity, message.price);
128  // pass the message to the handler
129  handler.did_receive(this, message);
130  }
131 
136  inline void handle(const Messages::DeleteOrder& message) {
137  if (not book.has(message.uid)) { // the limit order was not found
138  std::cerr << "DataFeed::Receiver::handle(DeleteOrder) - received delete for non-existent order: " << message.uid << std::endl;
139  // TODO: handle error
140  return;
141  }
142  // delete the order in the book
143  book.cancel(message.uid);
144  // pass the message to the handler
145  handler.did_receive(this, message);
146  }
147 
152  inline void handle(const Messages::Trade& message) {
153  if (not book.has(message.uid)) { // the limit order was not found
154  std::cerr << "DataFeed::Receiver::handle(Trade) - received trade for non-existent order: " << message.uid << std::endl;
155  // TODO: handle error
156  return;
157  }
158  // reduce the size of the given order
159  // (removes order if remaining size is 0)
160  book.reduce(message.uid, message.quantity);
161  // pass the message to the handler
162  handler.did_receive(this, message);
163  }
164 
169  inline void handle(const Messages::Header& header) {
170  throw Exception("Receiver::handle(Header) - received message with invalid header " + header.to_string());
171  }
172 
173  public:
184  asio::io_context& io_context,
185  const asio::ip::address& listen,
186  const asio::ip::address& group,
187  uint16_t port,
188  Handler& handler_,
189  bool reuse_enabled = true
190  ) : socket(io_context), handler(handler_) {
191  // Create the socket so that multiple may be bound to the same address.
192  asio::ip::udp::endpoint listen_endpoint(listen, port);
193  socket.open(listen_endpoint.protocol());
194  socket.set_option(asio::ip::udp::socket::reuse_address(reuse_enabled));
195  socket.bind(asio::ip::udp::endpoint(group, port));
196  socket.set_option(asio::ip::multicast::join_group(group));
197  // detect if the compilation platform is MacOS and don't try to set
198  // the buffer, if so.
199 #if __APPLE__
200 #else
201  // set the size of the receive buffer to the maximum allowed size
202  auto size = std::numeric_limits<uint32_t>::max();
203  auto recv_buffer_size = asio::socket_base::receive_buffer_size(size);
204  socket.set_option(recv_buffer_size);
205 #endif
206  // read the first message
207  read_message();
208  }
209 
214  inline const LOB::LimitOrderBook& get_book() { return book; }
215 
220  inline bool is_session_active() const { return _is_session_active; }
221 
226  inline const Handler& get_handler() const { return handler; }
227 };
228 
229 } // namespace DataFeed
230 
231 #endif // DATA_FEED_RECEIVER_HPP
DataFeed::Receiver
A multi-cast receiver for recreating a LOB::LimitOrderBook from network messages.
Definition: receiver.hpp:36
DataFeed::Messages::Trade
A message that indicates a market order matches with a limit order.
Definition: messages.hpp:387
DataFeed::Messages::DeleteOrder
A message that indicates a limit order was added to the book.
Definition: messages.hpp:332
DataFeed::Receiver::get_book
const LOB::LimitOrderBook & get_book()
Return the limit order book for this receiver.
Definition: receiver.hpp:214
DataFeed::Messages::EndOfSession
A message that indicates the end of a trading session.
Definition: messages.hpp:506
DataFeed::Messages::Clear
A message that indicates to clear all orders in the order book.
Definition: messages.hpp:213
DataFeed::SequenceNumber
uint32_t SequenceNumber
A type for sequence numbers.
Definition: messages.hpp:36
DataFeed::LOB::LimitOrderBook
An order book for managing Limit / Order objects in a continuous double auction.
Definition: limit_order_book.hpp:41
DataFeed::Messages::StartOfSession
A message that indicates the start of a trading session.
Definition: messages.hpp:460
Exception
An exception class.
Definition: exceptions.hpp:25
DataFeed::Messages::Header
A header containing type information and metadata for a message.
Definition: messages.hpp:145
DataFeed
Logic for sending and receiving messages on a financial data feed.
Definition: heartbeat.hpp:28
DataFeed::Messages::PacketQueue
std::deque< Packet > PacketQueue
A type for queuing packet buffers.
Definition: messages.hpp:113
DataFeed::LOB::LimitOrderBook::clear
void clear()
Clear all the orders in the book.
Definition: limit_order_book.hpp:55
DataFeed::Receiver::is_session_active
bool is_session_active() const
Return whether the trading session is active.
Definition: receiver.hpp:220
DataFeed::side_to_LOB_side
constexpr LOB::Side side_to_LOB_side(Side side)
Convert a side character to a LOB side value.
Definition: messages.hpp:82
DataFeed::Receiver::Receiver
Receiver(asio::io_context &io_context, const asio::ip::address &listen, const asio::ip::address &group, uint16_t port, Handler &handler_, bool reuse_enabled=true)
Initialize a new data feed receiver.
Definition: receiver.hpp:183
DataFeed::Receiver::get_handler
const Handler & get_handler() const
Return the handler connected to this feed receiver.
Definition: receiver.hpp:226
DataFeed::Messages::AddOrder
A message that indicates a limit order was added to the book.
Definition: messages.hpp:259
DataFeed::Messages::Packet
std::array< char, 40 > Packet
Definition: messages.hpp:110