CBOE Emulator  1.0
sender.hpp
1 // A class for multi-casting limit order book depth messages.
2 // Copyright 2020 Christian Kauten
3 //
4 // Author: Christian Kauten (kautenja@auburn.edu)
5 //
6 // This program is free software: you can redistribute it and/or modify
7 // it under the terms of the GNU General Public License as published by
8 // the Free Software Foundation, either version 3 of the License, or
9 // (at your option) any later version.
10 // This program is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 // You should have received a copy of the GNU General Public License
15 // along with this program. If not, see <http://www.gnu.org/licenses/>.
16 //
17 
18 #ifndef DATA_FEED_SENDER_HPP
19 #define DATA_FEED_SENDER_HPP
20 
21 #include <functional>
22 #include <utility>
23 #include <asio.hpp>
24 #include "exceptions.hpp"
25 #include "messages.hpp"
26 
28 namespace DataFeed {
29 
31 class Sender {
32  private:
34  asio::ip::udp::endpoint endpoint;
36  asio::ip::udp::socket socket;
38  SequenceNumber sequence = 1;
40  Messages::PacketQueue output_buffer;
42  uint32_t output_buffer_size = 0;
43 
45  void send_message() {
46  socket.async_send_to(
47  asio::buffer(output_buffer.front(), sizeof(Messages::Packet)),
48  endpoint,
49  [this](const std::error_code& error, std::size_t) {
50  if (error) // an error occurred while sending the message
51  throw Exception("DataFeed::Sender::send - " + error.message());
52  // remove the sent message from the queue
53  output_buffer.pop_front();
54  // send another message if there is more work
55  if (--output_buffer_size) send_message();
56  }
57  );
58  }
59 
60  public:
69  asio::io_context& context,
70  const asio::ip::address& group,
71  uint16_t port
72  ) : endpoint(group, port), socket(context, endpoint.protocol()) { }
73 
78  inline void set_ttl(uint32_t ttl) {
79  socket.set_option(asio::ip::multicast::hops(ttl));
80  }
81 
83  inline void enable_loopback() {
84  socket.set_option(asio::ip::multicast::enable_loopback(true));
85  }
86 
91  inline void set_outbound_interface(const asio::ip::address_v4& interface) {
92  socket.set_option(asio::ip::multicast::outbound_interface(interface));
93  }
94 
99  inline SequenceNumber get_sequence() const { return sequence; }
100 
105  inline asio::ip::address group() const { return endpoint.address(); }
106 
111  inline uint16_t port() const { return endpoint.port(); }
112 
117  inline uint32_t get_output_buffer_size() const { return output_buffer_size; }
118 
124  template <class T, typename ...Args>
125  inline void send(Args && ...args) {
126  // put an empty packet on the back of the queue
127  output_buffer.emplace_back();
128  // initialize the message data on the buffer using forwarded arguments
129  // and placement new to construct the object in the buffer memory
130  new (&output_buffer.back()) T{std::forward<Args>(args)..., sequence++, Clock::get_time()};
131  // increment the number of messages available now the buffer is ready
132  // start the write event loop if this is the first message
133  if (++output_buffer_size == 1) send_message();
134  }
135 };
136 
137 } // namespace DataFeed
138 
139 #endif // DATA_FEED_SENDER_HPP
DataFeed::Sender::port
uint16_t port() const
Return the port this sender is running at.
Definition: sender.hpp:111
DataFeed::Sender
A class for multi-casting depth of book messages from a LOB::LimitOrderBook.
Definition: sender.hpp:31
DataFeed::Sender::get_sequence
SequenceNumber get_sequence() const
Return the sequence number at this sender.
Definition: sender.hpp:99
DataFeed::Sender::Sender
Sender(asio::io_context &context, const asio::ip::address &group, uint16_t port)
Initialize a new data feed multi-cast server.
Definition: sender.hpp:68
DataFeed::SequenceNumber
uint32_t SequenceNumber
A type for sequence numbers.
Definition: messages.hpp:36
Clock::get_time
TimeStamp get_time()
Return the time in nanoseconds as a 64-bit unsigned integer (Timestamp).
Definition: clock.hpp:36
DataFeed
Logic for sending and receiving messages on a financial data feed.
Definition: heartbeat.hpp:28
DataFeed::Sender::get_output_buffer_size
uint32_t get_output_buffer_size() const
Return the current size of the output buffer (number of messages).
Definition: sender.hpp:117
DataFeed::Messages::PacketQueue
std::deque< Packet > PacketQueue
A type for queuing packet buffers.
Definition: messages.hpp:113
DataFeed::Sender::set_ttl
void set_ttl(uint32_t ttl)
Set the TTL hops for messages rendered by the sender.
Definition: sender.hpp:78
DataFeed::Sender::send
void send(Args &&...args)
Send a message.
Definition: sender.hpp:125
DataFeed::Sender::group
asio::ip::address group() const
Return the group that this sender is sending to.
Definition: sender.hpp:105
DataFeed::Sender::enable_loopback
void enable_loopback()
Enable loop-back for the sender.
Definition: sender.hpp:83
DataFeed::Sender::set_outbound_interface
void set_outbound_interface(const asio::ip::address_v4 &interface)
Set the outbound interface for the sender.
Definition: sender.hpp:91
DataFeed::Messages::Packet
std::array< char, 40 > Packet
Definition: messages.hpp:110