CBOE Emulator  1.0
connection.hpp
1 // A single connection to the order entry server.
2 // Copyright 2020 Christian Kauten
3 //
4 // Author: Christian Kauten (kautenja@auburn.edu)
5 //
6 // This program is free software: you can redistribute it and/or modify
7 // it under the terms of the GNU General Public License as published by
8 // the Free Software Foundation, either version 3 of the License, or
9 // (at your option) any later version.
10 // This program is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 // You should have received a copy of the GNU General Public License
15 // along with this program. If not, see <http://www.gnu.org/licenses/>.
16 //
17 
18 #ifndef ORDER_ENTRY_CONNECTION_HPP
19 #define ORDER_ENTRY_CONNECTION_HPP
20 
21 #include <functional>
22 #include <iostream>
23 #include <memory>
24 #include <string>
25 #include <asio.hpp>
26 #include "exceptions.hpp"
27 #include "limit_order_book/limit_order_book.hpp"
28 #include "messages.hpp"
29 #include "authorizer.hpp"
30 #include "system_account.hpp"
31 
32 using asio::ip::tcp;
33 
35 namespace OrderEntry {
36 
38 class Connection : public std::enable_shared_from_this<Connection> {
39  private:
41  tcp::socket socket;
43  Authorizer<Connection>& authorizer;
45  LOB::LimitOrderBook& book;
47  SystemAccount<Connection>* account = nullptr;
49  Messages::PacketQueue input_buffer;
51  Messages::PacketQueue output_buffer;
53  uint32_t output_buffer_size = 0;
55  SequenceNumber sequence = 0;
57  asio::steady_timer timer;
58 
65  Connection(
66  asio::io_context& context,
67  Authorizer<Connection>& authorizer_,
68  LOB::LimitOrderBook& book_
69  ) : socket(context), authorizer(authorizer_), book(book_), timer(context) { }
70 
72  void logout() {
73  // user is already logged out
74  if (account == nullptr) return;
75  // mark the account as disconnected
76  account->is_connected = false;
77  // remove the handler from the account
78  account->handler = nullptr;
79  // remove the account from the connection
80  account = nullptr;
81  }
82 
84  inline void write_buffer() {
85  asio::async_write(
86  socket,
87  asio::buffer(output_buffer.front(), sizeof(Messages::Packet)),
88  [this](const std::error_code& error, std::size_t) {
89  if (error) {
90  std::cout << "OrderEntry::Connection::write_buffer - " << error << std::endl;
91  return;
92  }
93  // remove the sent message from the queue
94  output_buffer.pop_front();
95  // send another message if there is more work
96  if (--output_buffer_size) write_buffer();
97 
98  }
99  );
100  }
101 
107  template <class T, typename ...Args>
108  inline void send(Args && ...args) {
109  // put an empty packet on the back of the queue
110  output_buffer.emplace_back();
111  // initialize the message data on the buffer using forwarded arguments
112  new (&output_buffer.back()) T{sequence++, std::forward<Args>(args)...};
113  // increment the number of messages available now the buffer is ready
114  // start the write event loop if this is the first message
115  if (++output_buffer_size == 1) write_buffer();
116  }
117 
119  void read_buffer() {
120  auto self(shared_from_this());
121  input_buffer.emplace_back();
122  asio::async_read(
123  socket,
124  asio::buffer(input_buffer.back(), sizeof(Messages::Packet)),
125  [this, self](const std::error_code& error, std::size_t) {
126  if (error) {
127  if (error == asio::error::eof) { // connection closed
128  // logout the account and close the connection
129  logout();
130  return;
131  }
132  // an unexpected error occurred, throw the error
133  throw Exception("Server::read_buffer - " + error.message());
134  }
135  // copy the message from the front of the queue and pop it off
136  Messages::Packet message = input_buffer.front();
137  input_buffer.pop_front();
138  // read another message from the client
139  read_buffer();
140  // unwrap and handle the data using the template ID
141  switch (reinterpret_cast<Messages::Header&>(message).uid) {
142  case Messages::MessageID::LoginRequest: { handle(reinterpret_cast<Messages::LoginRequest&>(message)); break; }
143  case Messages::MessageID::LogoutRequest: { handle(reinterpret_cast<Messages::LogoutRequest&>(message)); break; }
144  case Messages::MessageID::OrderRequest: { handle(reinterpret_cast<Messages::OrderRequest&>(message)); break; }
145  case Messages::MessageID::CancelRequest: { handle(reinterpret_cast<Messages::CancelRequest&>(message)); break; }
146  case Messages::MessageID::ReplaceRequest: { handle(reinterpret_cast<Messages::ReplaceRequest&>(message)); break; }
147  case Messages::MessageID::PurgeRequest: { handle(reinterpret_cast<Messages::PurgeRequest&>(message)); break; }
148  default: { handle(reinterpret_cast<Messages::Header&>(message)); break; }
149  }
150  }
151  );
152  }
153 
158  void handle(const Messages::LoginRequest& request) {
159  // convert the user-name / password character arrays to strings
160  auto username = request.username_string();
161  auto password = request.password_string();
162  if (is_logged_in()) { // the client has already logged into this connection
163  // just format a response, don't de-authorize the connection
164  send<Messages::LoginResponse>(Messages::LoginResponseStatus::AlreadyAuthorized);
165  } else if (not authorizer.is_valid(username, password)) { // invalid credentials
166  // just format a response, no other work needs done
167  send<Messages::LoginResponse>(Messages::LoginResponseStatus::NotAuthorized);
168  } else if (authorizer.get_account(username)->is_connected) { // session in use on other connection
169  // just format a response, no other work needs done
170  send<Messages::LoginResponse>(Messages::LoginResponseStatus::SessionInUse);
171  } else { // valid credentials
172  // set the account for this connection based on the user-name
173  account = authorizer.get_account(username);
174  // set the trade handler for the account to this connection
175  // this ensures that `this->trade` is called by the account handler
176  // when trades connected to the account occur
177  account->handler = this;
178  // set the connected flag for account to prevent additional sessions
179  // being created on different connections
180  account->is_connected = true;
181  // format the login response message for the client
182  send<Messages::LoginResponse>(Messages::LoginResponseStatus::Accepted);
183  }
184  }
185 
190  void handle(const Messages::LogoutRequest& request) {
191  // the client is not authorized to begin with
192  if (not is_logged_in()) {
193  send<Messages::LogoutResponse>(Messages::LogoutReason::ProtocolViolation);
194  return;
195  }
196  // logout the user and format a response
197  logout();
198  send<Messages::LogoutResponse>(Messages::LogoutReason::UserRequested);
199  }
200 
205  void handle(const Messages::OrderRequest& request) {
206  if (not is_logged_in()) { // the client is not authorized to send
207  send<Messages::OrderResponse>(Messages::ORDER_ID_REJECTED, Messages::OrderStatus::Rejected);
208  return;
209  }
210  // handle the order
211  if (request.price == Messages::ORDER_PRICE_MARKET) { // market order
212  book.market(account, side_to_LOB_side(request.side), request.quantity);
213  send<Messages::OrderResponse>(Messages::ORDER_ID_MARKET, Messages::OrderStatus::Accepted);
214  } else { // limit order
215  auto uid = book.limit(account, side_to_LOB_side(request.side), request.quantity, request.price);
216  send<Messages::OrderResponse>(uid, Messages::OrderStatus::Accepted);
217  }
218  }
219 
224  void handle(const Messages::CancelRequest& request) {
225  // the client is not authorized to send
226  if (not is_logged_in()) {
227  send<Messages::CancelResponse>(request.order_id, Messages::CancelStatus::Rejected);
228  return;
229  }
230  // the order doesn't exist
231  if (not book.has(request.order_id)) {
232  send<Messages::CancelResponse>(request.order_id, Messages::CancelStatus::Rejected);
233  return;
234  }
235  // order doesn't belong to this account
236  if (static_cast<SystemAccount<Connection>*>(book.get(request.order_id).account)->username.compare(account->username) != 0) {
237  send<Messages::CancelResponse>(request.order_id, Messages::CancelStatus::Rejected);
238  return;
239  }
240  // cancel the order and send the response
241  book.cancel(request.order_id);
242  send<Messages::CancelResponse>(request.order_id, Messages::CancelStatus::Accepted);
243  }
244 
249  void handle(const Messages::ReplaceRequest& request) {
250  // the client is not authorized to send
251  if (not is_logged_in()) {
252  send<Messages::ReplaceResponse>(request.order_id, Messages::ORDER_ID_REJECTED, Messages::ReplaceStatus::Rejected);
253  return;
254  }
255  // check if the replaced order still exists (it may have filled already)
256  OrderID canceled = Messages::ORDER_ID_REJECTED;
257  if (book.has(request.order_id)) {
258  // order doesn't belong to this account
259  if (static_cast<SystemAccount<Connection>*>(book.get(request.order_id).account)->username.compare(account->username) != 0) {
260  send<Messages::ReplaceResponse>(request.order_id, Messages::ORDER_ID_REJECTED, Messages::ReplaceStatus::Rejected);
261  return;
262  }
263  // cancel the order and send the response
264  book.cancel(request.order_id);
265  canceled = request.order_id;
266  }
267  // place the new order
268  OrderID uid = Messages::ORDER_ID_REJECTED;
269  switch (request.side) {
270  case Side::Sell: {
271  uid = book.limit_sell(account, request.quantity, request.price);
272  break;
273  }
274  case Side::Buy: {
275  uid = book.limit_buy(account, request.quantity, request.price);
276  break;
277  }
278  }
279  // send the acceptance response
280  send<Messages::ReplaceResponse>(canceled, uid, Messages::ReplaceStatus::Accepted);
281  }
282 
287  void handle(const Messages::PurgeRequest& request) {
288  // the client is not authorized to send
289  if (not is_logged_in()) {
290  send<Messages::PurgeResponse>(Messages::PurgeStatus::Rejected);
291  return;
292  }
293  // cancel all the orders. it's important to use a while loop here
294  // instead of a for loop because calls to cancel will change the state
295  // of the orders set and cause a segmentation fault if a for loop is
296  // used in some cases
297  while (account->orders.size()) {
298  book.cancel((*account->orders.begin())->uid);
299  }
300  // send the purge response to the client
301  send<Messages::PurgeResponse>(Messages::PurgeStatus::Accepted);
302  }
303 
308  void handle(const Messages::Header& header) {
309  std::cout << "OrderEntry::Connection::handle(Header) - " << header << std::endl;
310  // logout the user and send a response
311  logout();
312  send<Messages::LogoutResponse>(Messages::LogoutReason::ProtocolViolation);
313  }
314 
315  public:
317  typedef std::shared_ptr<Connection> Pointer;
318 
326  static Pointer create(
327  asio::io_context& context,
328  Authorizer<Connection>& authorizer,
329  LOB::LimitOrderBook& book
330  ) { return Pointer(new Connection(context, authorizer, book)); }
331 
336  inline tcp::socket& get_socket() { return socket; }
337 
342  inline bool is_logged_in() const { return account != nullptr; }
343 
348  inline uint32_t get_output_buffer_size() const { return output_buffer_size; }
349 
354  inline SequenceNumber get_sequence() const { return sequence; }
355 
357  inline void start() { read_buffer(); }
358 
367  inline void trade(
368  OrderID order_id,
369  Price price,
370  Quantity size,
371  Quantity leaves_size,
372  Side side
373  ) { send<Messages::TradeResponse>(order_id, price, size, leaves_size, side); }
374 };
375 
376 } // namespace OrderEntry
377 
378 #endif // ORDER_ENTRY_CONNECTION_HPP
OrderEntry::Connection::create
static Pointer create(asio::io_context &context, Authorizer< Connection > &authorizer, LOB::LimitOrderBook &book)
Create a shared pointer to a new connection.
Definition: connection.hpp:326
OrderEntry
Logic for sending/receiving application messages in a financial market.
Definition: authorizer.hpp:26
OrderEntry::Messages::Packet
std::array< char, 40 > Packet
Definition: messages.hpp:146
OrderEntry::SystemAccount::is_connected
bool is_connected
whether the account is currently logged in
Definition: system_account.hpp:43
OrderEntry::Connection
A TCP connection to a client device for providing direct market access.
Definition: connection.hpp:38
OrderEntry::Authorizer
Logic for looking up and validating credentials for new connections.
Definition: authorizer.hpp:32
OrderEntry::SequenceNumber
uint32_t SequenceNumber
A type for sequence numbers.
Definition: messages.hpp:36
DataFeed::OrderID
uint64_t OrderID
A type for order prices.
Definition: messages.hpp:39
OrderEntry::SystemAccount::handler
Handler * handler
the connection for this account
Definition: system_account.hpp:41
OrderEntry::Connection::Pointer
std::shared_ptr< Connection > Pointer
a shared pointer type for this connection object
Definition: connection.hpp:317
OrderEntry::LOB::LimitOrderBook
An order book for managing Limit / Order objects in a continuous double auction.
Definition: limit_order_book.hpp:40
DataFeed::side_to_LOB_side
constexpr LOB::Side side_to_LOB_side(Side side)
Convert a side character to a LOB side value.
Definition: messages.hpp:82
OrderEntry::Messages::PacketQueue
std::deque< Packet > PacketQueue
A type for queuing packet buffers.
Definition: messages.hpp:149
OrderEntry::OrderID
uint64_t OrderID
A type for order IDs.
Definition: messages.hpp:139
OrderEntry::Connection::is_logged_in
bool is_logged_in() const
Return true if the connection is authorized, false otherwise.
Definition: connection.hpp:342
OrderEntry::Quantity
uint32_t Quantity
A type for order quantities.
Definition: messages.hpp:133
OrderEntry::Connection::get_output_buffer_size
uint32_t get_output_buffer_size() const
Return the size of the output buffer (number of queued messages).
Definition: connection.hpp:348
OrderEntry::Price
uint64_t Price
A type for order prices.
Definition: messages.hpp:136
OrderEntry::SystemAccount
A subclass of the LOB::Account that manages client state on the market server.
Definition: system_account.hpp:35
OrderEntry::Side
Side
The side of an order.
Definition: messages.hpp:71
OrderEntry::Connection::start
void start()
Start the connection.
Definition: connection.hpp:357
OrderEntry::Connection::get_sequence
SequenceNumber get_sequence() const
Return the sequence number at this sender.
Definition: connection.hpp:354
OrderEntry::Connection::trade
void trade(OrderID order_id, Price price, Quantity size, Quantity leaves_size, Side side)
Send a trade message to the client.
Definition: connection.hpp:367
OrderEntry::Connection::get_socket
tcp::socket & get_socket()
Return the socket this connection is using.
Definition: connection.hpp:336