#ifdef _WIN32 #ifndef _WIN32_WINNT #define _WIN32_WINNT 0x0501 #endif #if _WIN32_WINNT < _WIN32_WINNT_WINXP #error "works on win xp of later" #endif #ifdef _MSC_VER #define _CRT_SECURE_NO_WARNINGS #endif #endif #define __STDC_FORMAT_MACROS #include #include #include #include #include #include #include #include #ifdef _MSC_VER #include #else #include #endif #ifdef _WIN32 #include #include #include typedef int socklen_t; #ifdef _MSC_VER #pragma comment (lib, "Ws2_32.lib") #else #pragma message("build with \"-lws2_32\" or \"-lwsock32\"") #endif #define AGAIN WSAEWOULDBLOCK #define WOULDBLOCK WSAEWOULDBLOCK #define INPROGRESS WSAEWOULDBLOCK #define SOL_TCP IPPROTO_TCP #else #include #include #include #include #include #include #include #include #define closesocket close #define AGAIN EAGAIN #define WOULDBLOCK EWOULDBLOCK #define INPROGRESS EINPROGRESS #endif typedef char ascii; typedef int8_t int1; typedef int16_t int2; typedef int32_t int4; typedef int64_t int8; typedef int64_t dec2; typedef int64_t dec8; typedef int32_t time4; typedef uint64_t time8; time8 utime() { #ifdef _WIN32 SYSTEMTIME system_time; FILETIME file_time; ULARGE_INTEGER integer, integer0; GetSystemTime(&system_time); SystemTimeToFileTime(&system_time, &file_time); integer.HighPart = file_time.dwHighDateTime; integer.LowPart = file_time.dwLowDateTime; system_time.wYear = 1970; system_time.wDay = 1; system_time.wMonth = 1; system_time.wHour = 0; system_time.wMinute = 0; system_time.wSecond = 0; system_time.wMilliseconds = 0; SystemTimeToFileTime(&system_time, &file_time); integer0.HighPart = file_time.dwHighDateTime; integer0.LowPart = file_time.dwLowDateTime; return (time8)((integer.QuadPart - integer0.QuadPart) / 1000); #else struct timeval tv; struct timezone tz; gettimeofday(&tv, &tz); return (time8)tv.tv_sec * 1000000 + (time8)tv.tv_usec; #endif } void local_time(int *year, int *month, int *day, int *hour, int *minute, int *second, int *millisecond, int *microsecond) { #ifdef _WIN32 SYSTEMTIME system_time; GetLocalTime(&system_time); if (year != NULL) *year = (int)system_time.wYear; if (month != NULL) *month = (int)system_time.wMonth; if (day != NULL) *day = (int)system_time.wDay; if (hour != NULL) *hour = (int)system_time.wHour; if (minute != NULL) *minute = (int)system_time.wMinute; if (second != NULL) *second = (int)system_time.wSecond; if (millisecond != NULL) *millisecond = system_time.wMilliseconds; if (microsecond != NULL) *microsecond = 0; #else time8 time8; time_t time4; struct tm *tm; time8 = utime(); time4 = (time_t)(time8 / 1000000); tm = localtime(&time4); if (year != NULL) *year = (int)tm->tm_year; if (month != NULL) *month = (int)tm->tm_mon + 1; if (day != NULL) *day = (int)tm->tm_mday; if (hour != NULL) *hour = (int)tm->tm_hour; if (minute != NULL) *minute = (int)tm->tm_min; if (second != NULL) *second = (int)tm->tm_sec; if (millisecond != NULL) *millisecond = (int)(time8 / 1000 % 1000); if (microsecond != NULL) *microsecond = (int)(time8 % 1000); #endif } void _vlog(FILE *file, const char level[], const char format[], va_list args) { int hour, minute, second, millisecond; local_time(NULL, NULL, NULL, &hour, &minute, &second, &millisecond, NULL); fprintf(file, "[%02d:%02d:%02d.%03d] [%s] ", hour, minute, second, millisecond, level); vfprintf(file, format, args); fputc('\n', file); } #ifdef __GNUC__ void _log(FILE *file, const char level[], const char format[], ...) __attribute__((format(printf, 3, 4))); #endif void _log(FILE *file, const char level[], const char format[], ...) { va_list arg_list; va_start(arg_list, format); _vlog(file, level, format, arg_list); va_end(arg_list); } #define debug(format, ...) _log(stdout, "debug", format, ##__VA_ARGS__) #define info(format, ...) _log(stdout, "info", format, ##__VA_ARGS__) #define warning(format, ...) _log(stderr, "warning", format, ##__VA_ARGS__) #define error(format, ...) _log(stderr, "error", format, ##__VA_ARGS__) void print_error(const char message[]) { #ifdef _WIN32 LPSTR buffer = NULL; if (FormatMessageA(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM, NULL, WSAGetLastError(), MAKELANGID(LANG_ENGLISH, SUBLANG_ENGLISH_US), (LPSTR)&buffer, 0, NULL) == 0) { error("%s: (%d) unknown error", message, WSAGetLastError()); return; } error("%s: (%d) %s", message, WSAGetLastError(), buffer); LocalFree(buffer); #else error("%s: (%d) %s", message, errno, strerror(errno)); #endif } int get_error() { #ifdef _WIN32 return WSAGetLastError(); #else return errno; #endif } int get_addr_info(const char host[], struct sockaddr_in *sock_addr) { const struct addrinfo hints = { 0, AF_INET, SOCK_DGRAM }; struct addrinfo *addr_info = 0, *ai; int gai_error; gai_error = getaddrinfo(host, 0, &hints, &addr_info); if (gai_error != 0) return error("Failed to resolve address \"%s\": (%d) %s", host, gai_error, gai_strerror(gai_error)), -1; for (ai = addr_info; ai != NULL; ai = ai->ai_next) { if (ai->ai_family != AF_INET) continue; memcpy(sock_addr, ai->ai_addr, ai->ai_addrlen); break; } freeaddrinfo(addr_info); if (ai == NULL) return -1; return 0; } int print_sock_info(int sock) { struct sockaddr_in self_addr, peer_addr; char address[INET_ADDRSTRLEN]; socklen_t self_len = sizeof(self_addr), peer_len = sizeof(peer_addr); if (getsockname(sock, (struct sockaddr *)&self_addr, &self_len) == -1) return print_error("getsockname failed"), -1; if (getpeername(sock, (struct sockaddr *)&peer_addr, &peer_len) == -1) return print_error("getpeername failed"), -1; strcpy(address, inet_ntoa(self_addr.sin_addr)); info("Socket %s:%hu is connected to %s:%hu", address, ntohs(self_addr.sin_port), inet_ntoa(peer_addr.sin_addr), ntohs(peer_addr.sin_port)); return 0; } int setup(struct sockaddr_in *sock_addr) { int sock; int flag = 1; struct linger linger = { 0, 0 }; sock = socket(AF_INET, SOCK_STREAM, 0); if (sock == -1) return print_error("socket failed"), -1; if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char *)&flag, sizeof(flag)) == -1) return print_error("setsockopt failed"), -1; if (setsockopt(sock, SOL_SOCKET, SO_LINGER, (char *)&linger, sizeof(linger)) == -1) return print_error("setsockopt failed"), -1; if (setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag)) == -1) return print_error("setsockopt failed"), -1; #ifdef _WIN32 if (ioctlsocket(sock, FIONBIO, (u_long *)&flag) == -1) return print_error("ioctlsocket failed"), -1; #else if (fcntl(sock, F_SETFL, O_NONBLOCK) == -1) return print_error("fcntl failed"), -1; #endif if (connect(sock, (struct sockaddr *)sock_addr, sizeof(*sock_addr)) == -1) { if (get_error() != INPROGRESS) return print_error("bind failed"), -1; } return sock; } #pragma pack(push, 1) struct frame { int2 size; int2 msgid; int8 seq; }; struct instrument { int2 source_id; int4 instrument_id; }; struct account { int4 member_id; // Идентификатор участника торгов ascii account[16]; // Идентификатор торгово-клирингового счета участника торгов ascii client_id[16]; // Идентификатор клиентского кода }; struct otccodes { ascii initiator_party[16]; // Идентификатор отправителя адресного поручения ascii ctrparty[16]; // Идентификатор получателя адресного поручения }; struct hello_msg { struct frame frame; ascii login[16]; ascii password[16]; }; #define HELLO 1 struct report_msg { struct frame frame; int2 status; char reason[128]; int2 addresses_offset; int2 addresses_count; struct gateway { int2 type; int1 ver; int1 pad0; char addresses[48]; } gateway[1]; // extendable }; #define REPORT 2 struct resend_request_msg { struct frame frame; int8 from_seq; int8 till_seq; }; #define RESEND_REQUEST 8005 struct resend_report_msg { struct frame frame; int1 status; }; #define RESEND_REPORT 8105 struct login_msg { struct frame frame; ascii login[16]; ascii password[16]; int1 reset_seq; int4 heartbeat_ms; }; #define LOGIN 8001 struct logout_msg { struct frame frame; ascii login[16]; }; #define LOGOUT 8002 struct logon_msg { struct frame frame; int8 last_seq; int8 expected_seq; ascii system_id[8]; }; #define LOGON 8101 struct reject_msg { struct frame frame; int8 ref_seq; int2 ref_msgid; int2 reason; char message[33]; }; #define REJECT 8102 struct heartbeat_msg { struct frame frame; }; #define HEARTBEAT 8103 struct user_header { ascii clorder_id[20]; }; struct gate_header { time8 system_time; ascii clorder_id[20]; int2 source_id; ascii user_id[16]; }; struct add_order_msg { struct frame frame; struct user_header user_header; struct instrument instrument; // Компонент идентификации торгового инструмента int1 dir; // Направление поручения. int1 type; // Тип поручения. int1 time_in_force; // Время действия поручения. int1 passive_only; // Зарезервированное поле. Заполняется нулевым байтом int1 auto_cancel; // Режим автоматического снятия при разрыве соединения. int1 pad; // Зарезервированное поле. Заполняется нулевым байтом int2 routing_instruction; // Алгоритма маршрутизации остатка поручения. int2 routing_dest; // Идентификатор торговой площадки исполнения (см. 3.8.1.1.1) int4 amount; // Объем поручения в лотах 40 amount_extra int4 Объем видимой части поручения в лотах. Должно быть заполнено только при type=ICEBERG int4 amount_extra; // Объем видимой части поручения в лотах. Должно быть заполнено только при type=ICEBERG dec8 price; // Цена. Для репо указывается годовая доходность в процентах dec8 price_extra; // Дополнительная цена. Для репо может быть указана цена сделки int8 flags; // Параметры, зависящие от рынка. В текущей версии должно быть заполнено 0x0 time8 time_valid; // Последний срок, когда поручение может быть принято торговой платформой time4 date_expire; // Дата и время автоматического снятия поручения. В текущей версии должно быть заполнено нулем struct account account; struct otccodes parties; char comment[24]; // Клиентский комментарий к поручению ascii extra_ref[12]; // Дополнительный идентификатор заявки ascii extra1[4]; // Дополнительное текстовое поле int2 prime_exchange; // Основной пул ликвидности и пул ликвидности для маршрутизации остатка. (см. 3.8.1.1) int4 match_ref; // Идентификатор для сведения адресного поручения }; #define ADD_ORDER 101 struct reject_report_msg { struct frame frame; struct gate_header gate_header; int2 market; // Торговая площадка отклонившая запрос int2 reason; // Код причины отклонения char message[33]; // Описание причины отклонения int8 extra_data0; // Идентификатор поручения, указан при отклонении запроса на снятие по order_id }; #define REJECT_REPORT 201 struct add_report_msg { struct frame frame; struct gate_header gate_header; struct instrument instrument; // Компонент идентификации торгового инструмента int1 dir; // Направление поручения/заявки int1 type; // Тип поручения/заявки int1 time_in_force; // Время действия поручения int1 passive_only; // Режим постановки int1 auto_cancel; // Режим автоматического снятия при разрыве соединения int1 pad; // Зарезервированное поле, заполняется нулевым байтом int2 routing_instruction; // Алгоритм маршрутизации int2 routing_dest; // Идентификатор торговой площадки int4 amount; // Снимаемый объем поручения/заявки int4 amount_extra; // Объем видимой части поручения/заявки dec8 price; // Цена. Для репо указывается годовая доходность в процентах dec8 price_extra; // Цена сделки. Заполняется только для репо int8 flags; // Параметры, зависящие от рынка (значения см. в 3.5) time4 date_expire; // Дата и время автоматического снятия поручения time8 time_valid; // Последний срок, когда поручение могло быть принято торговой платформой struct account account; // Компонент идентификации клиента, подавшего заявку struct otccodes parties; // Компонент идентификации сторон адресной заявки int8 order_id; // Идентификатор поручения, присвоенный торговой платформой int8 orig_orderid; // Идентификатор исходного поручения, заполняется при изменении параметров поручения клиентом ascii exch_orderid[20]; // Идентификатор заявки, присвоенный биржей int1 price_entry; // Номер ценового уровня по отношению к лучшему, на который попала заявка int1 pad1; // Зарезервированное поле. Заполняется нулем char comment[24]; // Клиентский комментарий к поручению ascii extra_ref[12]; // Дополнительный идентификатор заявки ascii extra1[4]; // Дополнительное текстовое поле int2 prime_exchange; // Основная торговая площадка int4 match_ref; // Идентификатор для сведения адресного поручения int2 orig_market; // Пул ликвидности, указанный клиентом при подаче }; #define ADD_REPORT 212 struct cancel_report_msg { struct frame frame; struct gate_header gate_header; struct instrument instrument; // Компонент идентификации торгового инструмента int1 dir; // Направление поручения int1 type; // Тип поручения int4 amount; // Объем поручения int4 amount_rest; // Несведенный объем поручения dec8 price; // Цена. Для репо указывается годовая доходность в процентах dec8 price_extra; // Запрашиваемая цена. Для репо может быть указывается цена сделки int8 flags; // Параметры, зависящие от рынка struct account account; // Компонент идентификации клиента, подавшего поручение int8 order_id; // Идентификатор поручения, присвоенный торговой платформой ascii exch_orderid[20]; // Идентификатор заявки, присвоенный бирже int2 cancel_reason; // Причина снятия поручения. ascii orig_clorder_id[20]; // Опциональный клиентский идентификатор заявки на снятие }; #define CANCEL_REPORT 214 struct deal { dec8 deal_price; // Цена сделки int8 deal_id; // Идентификатор сделки, присвоенный биржей int4 amount; // Объем сделки }; struct execution_msg { struct frame frame; struct gate_header gate_header; struct instrument instrument; // Компонент идентификации торгового инструмента int1 dir; // Направление поручения int1 type; // Тип поручения/заявки dec8 price; // Цена поручения dec8 price_extra; // Запрашиваемая цена int8 flags; // Параметры, зависящие от рынка int2 exec_market; // Торговая площадка, на которой были совершены сделки struct account account; // Компонент идентификации клиента, подавшего поручение struct otccodes parties; // Компонент идентификации сторон адресного поручения int8 order_id; // Идентификатор поручения, присвоенный торговой платформой ascii exch_orderid[20]; // Идентификатор заявки, присвоенный биржей int4 amount_rest; // Активный остаток поручения int2 deals_offset; // Смещение первой записи группы deals относительно начала данного поля int2 deals_count; // Количество записей группы deals в данном сообщении struct deal deal[1]; // extendable }; #define EXECUTION 207 #pragma pack(pop) // timer typedef uint16_t timer_id_t; struct timer_list { struct timer_list *next; timer_id_t timer_id; time8 time; int (*handle)(void *data); void *data; }; struct timer_list *timer_list_add(struct timer_list *timer_list, timer_id_t timer_id, time8 time, int (*handle)(void *data), void *data) { if (timer_list == NULL || time < timer_list->time) { struct timer_list *new_timer_list; new_timer_list = malloc(sizeof(*timer_list)); new_timer_list->next = timer_list; new_timer_list->timer_id = timer_id; new_timer_list->time = time; new_timer_list->handle = handle; new_timer_list->data = data; return new_timer_list; } else { timer_list->next = timer_list_add(timer_list->next, timer_id, time, handle, data); return timer_list; } } struct timer_list *timer_list_remove(struct timer_list *timer_list, timer_id_t timer_id) { if (timer_list == NULL) return NULL; else if (timer_list->timer_id == timer_id) { struct timer_list *timer_list_next = timer_list->next; free(timer_list); return timer_list_next; } else { timer_list->next = timer_list_remove(timer_list->next, timer_id); return timer_list; } } struct timer_list * volatile timer_list = NULL; timer_id_t new_timer() { static timer_id_t timer_id = 1; return timer_id++; } void set_timer(timer_id_t timer_id, time8 time, int (*handle)(void *data), void *data) { timer_list = timer_list_remove(timer_list, timer_id); timer_list = timer_list_add(timer_list, timer_id, time, handle, data); } void drop_timer(timer_id_t timer_id) { timer_list = timer_list_remove(timer_list, timer_id); } struct timeval *get_timeout(struct timeval *tv) { time8 time, timeout = 0; if (timer_list == NULL) return NULL; time = utime(); if (time < timer_list->time) timeout = timer_list->time - time; tv->tv_sec = (uint32_t)(timeout / 1000000); tv->tv_usec = (uint32_t)(timeout % 1000000); return tv; } int process_timers() { time8 time; time = utime(); while (timer_list != NULL && timer_list->time <= time) { int (*handle)(void *data); void *data; handle = timer_list->handle; data = timer_list->data; timer_list = timer_list_remove(timer_list, timer_list->timer_id); if (handle(data) == -1) return -1; } return 0; } // storage struct storage_list { struct storage_list *next; struct frame msg; // Extendible }; struct storage_list *storage_list = NULL; struct storage_list *storage_list_add(struct storage_list *storage_list, struct frame *msg) { if (storage_list == NULL || msg->seq < storage_list->msg.seq) { struct storage_list *next = storage_list; storage_list = malloc(sizeof(struct storage_list) + msg->size); storage_list->next = next; memcpy(&storage_list->msg, msg, sizeof(*msg) + msg->size); } else if (msg->seq > storage_list->msg.seq) storage_list->next = storage_list_add(storage_list->next, msg); return storage_list; } struct storage_list *storage_list_pop(struct storage_list *storage_list) { struct storage_list *next = storage_list->next; free(storage_list); return next; } int store_message(struct frame *msg) { storage_list = storage_list_add(storage_list, msg); return 0; } int storage_empty() { return storage_list == NULL; } struct frame *top_message() { return &storage_list->msg; } void pop_message() { storage_list = storage_list_pop(storage_list); } // session struct params { int4 member_id; ascii account[16]; ascii client_id[16]; char host[48]; unsigned short port; ascii login[16]; ascii password[16]; int8 seq; }; struct session { int sock; char is_connecting; char *read_buffer; size_t read_buffer_size; size_t read_buffer_capacity; char *write_buffer; size_t write_buffer_offset; size_t write_buffer_size; size_t write_buffer_capacity; struct sockaddr_in sock_addr; int4 member_id; ascii account[16]; ascii client_id[16]; ascii login[16]; ascii password[16]; timer_id_t self_idle_timer_id; timer_id_t server_idle_timer_id; timer_id_t application_timer_id; time8 idle_timeout; // microseconds int8 self_seq; int8 next_seq; int8 next_online_seq; char is_restoring; }; int session_send(struct session *session) { while (session->write_buffer_size > 0) { int sent_size; if (session->write_buffer_offset + session->write_buffer_size <= session->write_buffer_capacity) sent_size = send(session->sock, session->write_buffer + session->write_buffer_offset, session->write_buffer_size, 0); else sent_size = send(session->sock, session->write_buffer + session->write_buffer_offset, session->write_buffer_capacity - session->write_buffer_offset, 0); if (sent_size == -1) { if (get_error() == WOULDBLOCK || get_error() == AGAIN) return -1; closesocket(session->sock); session->sock = -1; return print_error("send failed"), -1; } session->write_buffer_offset += sent_size; session->write_buffer_size -= sent_size; if (session->write_buffer_offset == session->write_buffer_capacity) session->write_buffer_offset = 0; debug("Data is sended: %d bytes", sent_size); } return 0; } int session_write(struct session *session, void *msg, size_t size); int session_self_idle_timeout(struct session *session) { struct heartbeat_msg msg; msg.frame.msgid = HEARTBEAT; msg.frame.seq = 0; msg.frame.size = sizeof(msg) - sizeof(msg.frame); if (session_write(session, &msg, sizeof(msg)) == -1) { if (get_error() != AGAIN && get_error() != WOULDBLOCK) return -1; } return 0; } int session_server_idle_timeout(struct session *session) { error("Server is not responding!"); return -1; } int session_logout(struct session *session) { struct logout_msg logout_msg; if (session->sock == -1) return -1; logout_msg.frame.msgid = LOGOUT; logout_msg.frame.seq = 0; logout_msg.frame.size = sizeof(logout_msg) - sizeof(logout_msg.frame); strncpy(logout_msg.login, session->login, sizeof(logout_msg.login) / sizeof(*logout_msg.login)); if (session_write(session, &logout_msg, sizeof(logout_msg)) == -1) { if (get_error() != AGAIN && get_error() != WOULDBLOCK) return -1; } return 0; } int session_write(struct session *session, void *buf, size_t size) { struct frame *msg = buf; info("Outgoing message: seq=%"PRId64", msgid=%hd", msg->seq, msg->msgid); if (session->write_buffer_size + size > session->write_buffer_capacity) { void *buffer; session->write_buffer_capacity += 65536; buffer = malloc(session->write_buffer_capacity); if (session->write_buffer_offset + session->write_buffer_size <= session->write_buffer_capacity) memcpy(buffer, session->write_buffer + session->write_buffer_offset, session->write_buffer_size); else { memcpy(buffer, session->write_buffer + session->write_buffer_offset, session->write_buffer_capacity - session->write_buffer_offset); memcpy(buffer, session->write_buffer, session->write_buffer_offset + session->write_buffer_size - session->write_buffer_capacity); } free(session->write_buffer); session->write_buffer = buffer; session->write_buffer_offset = 0; } if (session->write_buffer_offset + session->write_buffer_size + size <= session->write_buffer_capacity) memcpy(session->write_buffer + session->write_buffer_offset + session->write_buffer_size, msg, size); else if (session->write_buffer_offset + session->write_buffer_size < session->write_buffer_capacity) { size_t first_part_size = session->write_buffer_capacity - session->write_buffer_offset - session->write_buffer_size; memcpy(session->write_buffer + session->write_buffer_offset + session->write_buffer_size, msg, first_part_size); memcpy(session->write_buffer, (char *)msg + first_part_size, size - first_part_size); } else memcpy(session->write_buffer + session->write_buffer_offset + session->write_buffer_size - session->write_buffer_capacity, msg, size); session->write_buffer_size += size; if (!session->is_connecting) { if (session_send(session) == -1) { if (get_error() != AGAIN && get_error() != WOULDBLOCK) return -1; } } set_timer(session->self_idle_timer_id, utime() + session->idle_timeout, (int (*)(void*))session_self_idle_timeout, session); // if (msg->seq != 0) // logout if there is nothing to do any more // set_timer(session->application_timer_id, utime() + session->idle_timeout * 3, (int (*)(void *))session_logout, session); return 0; } int session_init(struct session *session, struct params *params) { session->read_buffer_capacity = 65536; session->read_buffer = malloc(session->read_buffer_capacity); session->read_buffer_size = 0; session->write_buffer_capacity = 65536; session->write_buffer = malloc(session->write_buffer_capacity); session->write_buffer_size = 0; session->write_buffer_offset = 0; session->member_id = params->member_id; strncpy(session->account, params->account, sizeof(session->account) / sizeof(*session->account)); strncpy(session->client_id, params->client_id, sizeof(session->client_id) / sizeof(*session->client_id)); strncpy(session->login, params->login, sizeof(session->login) / sizeof(*session->login)); strncpy(session->password, params->password, sizeof(session->password) / sizeof(*session->password)); session->self_idle_timer_id = new_timer(); session->server_idle_timer_id = new_timer(); session->application_timer_id = new_timer(); session->idle_timeout = 1000000; // 1 second session->self_seq = 0; session->next_seq = params->seq; session->next_online_seq = params->seq; session->is_restoring = 0; session->sock = -1; if (get_addr_info(params->host, &session->sock_addr) == -1) return -1; session->sock_addr.sin_port = htons(params->port); return 0; } void session_destroy(struct session *session) { free(session->read_buffer); free(session->write_buffer); } int session_enter(struct session *session) { struct hello_msg msg; session->sock = setup(&session->sock_addr); if (session->sock == -1) return -1; session->is_connecting = 1; msg.frame.msgid = HELLO; msg.frame.seq = 0; msg.frame.size = sizeof(msg) - sizeof(msg.frame); strncpy(msg.login, session->login, sizeof(msg.login) / sizeof(*msg.login)); strncpy(msg.password, session->password, sizeof(msg.password) / sizeof(*msg.password)); if (session_write(session, &msg, sizeof(msg)) == -1) { if (get_error() != AGAIN && get_error() != WOULDBLOCK) return -1; } return 0; } int session_receive(struct session *session, size_t size) { if (session->read_buffer_capacity < size) { void *buffer; session->read_buffer_capacity = size; buffer = malloc(session->read_buffer_capacity); memcpy(buffer, session->read_buffer, session->read_buffer_size); free(session->read_buffer); session->read_buffer = buffer; } while (session->read_buffer_size < size) { int read_size; read_size = recv(session->sock, session->read_buffer + session->read_buffer_size, size - session->read_buffer_size, 0); if (read_size == -1) { if (get_error() == AGAIN || get_error() == WOULDBLOCK) return -1; closesocket(session->sock); session->sock = -1; return print_error("recv failed"), -1; } if (read_size == 0) { closesocket(session->sock); session->sock = -1; debug("Socket is closed"); return -1; } session->read_buffer_size += read_size; debug("Data is received: %d bytes", read_size); } return 0; } int session_read(struct session *session) { if (session_receive(session, sizeof(struct frame)) == -1) return -1; return session_receive(session, sizeof(struct frame) + ((struct frame *)session->read_buffer)->size); } int session_login(struct session *session) { struct login_msg login_msg; session->sock = setup(&session->sock_addr); if (session->sock == -1) return -1; session->is_connecting = 1; login_msg.frame.msgid = LOGIN; login_msg.frame.seq = 0; login_msg.frame.size = sizeof(login_msg) - sizeof(login_msg.frame); strncpy(login_msg.login, session->login, 16); strncpy(login_msg.password, session->password, 16); login_msg.reset_seq = 0; login_msg.heartbeat_ms = (int4)(session->idle_timeout / 1000); // milliseconds if (session_write(session, &login_msg, sizeof(login_msg)) == -1) { if (get_error() != AGAIN && get_error() != WOULDBLOCK) return -1; } return 0; } int session_on_reject(struct session *session, struct reject_msg *reject_msg) { error("Message (seq=%"PRId64", msgid=%hd) was rejected: %hd, %s", reject_msg->ref_seq, reject_msg->ref_msgid, reject_msg->reason, reject_msg->message); if (reject_msg->ref_msgid == LOGIN && reject_msg->reason == 1 && strcmp(reject_msg->message, "WRONG_LOGIN_TIMESTAMP") == 0) { closesocket(session->sock); session->sock = -1; set_timer(session->self_idle_timer_id, utime() + session->idle_timeout / 2, (int (*)(void *))session_login, session); // waiting for login information to be delivered into the gate return 0; } return -1; } int session_on_report(struct session *session, struct report_msg *report_msg) { int i; char host[48]; unsigned short port; info("Logon status: (%hd) %s", report_msg->status, report_msg->reason); if (report_msg->status != 0) return -1; for (i = 0; i < report_msg->addresses_count; ++i) { if (report_msg->gateway[i].type & 0x2) // drop-copy gate break; } if (i == report_msg->addresses_count) { error("Gateway was not found."); return -1; } closesocket(session->sock); session->sock = -1; if (sscanf(report_msg->gateway[i].addresses, "%[^:]:%hu", host, &port) != 2) return -1; if (get_addr_info(host, &session->sock_addr) == -1) return -1; session->sock_addr.sin_port = htons(port); if (session_login(session) == -1) return -1; return 0; } int session_resend(struct session *session, int8 seq); int session_on_logon(struct session *session, struct logon_msg *logon_msg) { session->self_seq = logon_msg->expected_seq; if (session->next_online_seq == 0) session->next_seq = session->next_online_seq = logon_msg->last_seq + 1; info("System id: %.8s", logon_msg->system_id); session_resend(session, 0); return 0; } int session_on_reject_report(struct session *session, struct reject_report_msg *reject_report_msg) { warning("Reject report: %hd, %s", reject_report_msg->reason, reject_report_msg->message); return 0; } int session_on_add_report(struct session *session, struct add_report_msg *add_report_msg) { info("Order is accepted: market=%hd, order_id=%"PRId64", account=%s, client_id=%s, comment=\"%s\"", add_report_msg->instrument.source_id, add_report_msg->order_id, add_report_msg->account.account, add_report_msg->account.client_id, add_report_msg->comment); return 0; } int session_on_cancel_report(struct session *session, struct cancel_report_msg *cancel_report_msg) { info("Order is cancelled: market=%hd, order_id=%"PRId64", account=%s, client_id=%s", cancel_report_msg->instrument.source_id, cancel_report_msg->order_id, cancel_report_msg->account.account, cancel_report_msg->account.client_id); return 0; } int session_on_execution(struct session *session, struct execution_msg *execution_msg) { info("Execution: market=%hd, order_id=%"PRId64", account=%s, client_id=%s, deals_count=%d", execution_msg->instrument.source_id, execution_msg->order_id, execution_msg->account.account, execution_msg->account.client_id, execution_msg->deals_count); return 0; } int session_resend(struct session *session, int8 seq) { struct resend_request_msg resend_request_msg; resend_request_msg.frame.msgid = RESEND_REQUEST; resend_request_msg.frame.seq = 0; resend_request_msg.frame.size = sizeof(resend_request_msg) - sizeof(resend_request_msg.frame); resend_request_msg.from_seq = seq; resend_request_msg.till_seq = 0; if (seq > 0) session->next_seq = seq; else session->next_seq = 0; session->is_restoring = 1; if (session_write(session, &resend_request_msg, sizeof(resend_request_msg)) == -1) { if (get_error() != AGAIN && get_error() != WOULDBLOCK) return -1; } return 0; } int session_on_resend_report(struct session *session, struct resend_report_msg *resend_report_msg) { static char *status[] = { "ACK", "MORE", "FINISH", "DUPLICATE_REQUEST", "UNAVAILABLE" }; info("Resend status: %s", status[resend_report_msg->status]); switch (resend_report_msg->status) { case 0: // ACK info("Resend is started."); break; case 2: // FINISH if (session->next_seq < session->next_online_seq) { case 1: // MORE if (session_resend(session, session->next_seq) == -1) return -1; info("Resend more."); break; } session->is_restoring = 0; info("Resend is finished."); break; case 3: // DUPLICATE_REQUEST warning("Duplicate resend request."); break; case 4: // UNAVAILABLE error("Resend service is unavailable."); return -1; } return 0; } int session_process_message2(struct session *session, struct frame *msg); int session_process_message(struct session *session, struct frame *msg) { // if (msg->seq != 0) // logout if there is nothing to do any more // set_timer(session->application_timer_id, utime() + session->idle_timeout * 3, (int (*)(void *))session_logout, session); info("Incoming message: seq=%"PRId64", msgid=%hd", msg->seq, msg->msgid); if (msg->seq != 0) { if (session->next_seq == 0 || msg->seq == session->next_seq) { if (session->next_online_seq == 0 || msg->seq == session->next_online_seq) { info("Incoming message (ONLINE): seq=%"PRId64", msgid=%hd", msg->seq, msg->msgid); session->next_online_seq = msg->seq + 1; } else info("Incoming message (SNAPSHOT): seq=%"PRId64", msgid=%hd", msg->seq, msg->msgid); session->next_seq = msg->seq + 1; } else if (msg->seq > session->next_seq) { info("Incoming message (STORED): seq=%"PRId64", msgid=%hd", msg->seq, msg->msgid); if (msg->seq >= session->next_online_seq) session->next_online_seq = msg->seq + 1; if (store_message(msg) == -1) return -1; if (!session->is_restoring) { if (session_resend(session, session->next_seq) == -1) return -1; } return 0; } else { info("Incoming message (SKIPPED): seq=%"PRId64", msgid=%hd", msg->seq, msg->msgid); return 0; } } if (session_process_message2(session, msg) == -1) return -1; while (!storage_empty()) { struct frame *msg = top_message(); if (msg->seq != session->next_seq) break; info("Incoming message (RESTORED): seq=%"PRId64", msgid=%hd", msg->seq, msg->msgid); if (session_process_message2(session, msg) == -1) return -1; session->next_seq = msg->seq + 1; pop_message(); } return 0; } int session_process_message2(struct session *session, struct frame *msg) { switch (msg->msgid) { case HEARTBEAT: // do nothing break; case REJECT: if (session_on_reject(session, (struct reject_msg *)msg) == -1) return -1; break; case REPORT: // получить адрес шлюза и переоткрыть сокет if (session_on_report(session, (struct report_msg *)msg) == -1) return -1; break; case LOGON: // we are logged in if (session_on_logon(session, (struct logon_msg *)msg) == -1) return -1; break; case REJECT_REPORT: if (session_on_reject_report(session, (struct reject_report_msg *)msg) == -1) return -1; break; case ADD_REPORT: if (session_on_add_report(session, (struct add_report_msg *)msg) == -1) return -1; break; case CANCEL_REPORT: if (session_on_cancel_report(session, (struct cancel_report_msg *)msg) == -1) return -1; break; case EXECUTION: if (session_on_execution(session, (struct execution_msg *)msg) == -1) return -1; break; case RESEND_REPORT: if (session_on_resend_report(session, (struct resend_report_msg *)msg) == -1) return -1; } return 0; } int session_on_read(struct session *session) { if (session_read(session) == -1) { if (get_error() == AGAIN || get_error() == WOULDBLOCK) return 0; return -1; } set_timer(session->server_idle_timer_id, utime() + session->idle_timeout * 4, (int (*)(void*))session_server_idle_timeout, session); if (session_process_message(session, (struct frame *)session->read_buffer) == -1) return -1; session->read_buffer_size = 0; return 0; } int session_on_error(struct session *session) { return error("Connection is refused"), -1; } int session_on_write(struct session *session) { if (session->is_connecting) { #ifndef _WIN32 int flag; socklen_t len = sizeof(flag); if (getsockopt(session->sock, SOL_SOCKET, SO_ERROR, &flag, &len) == -1) return print_error("getsockopt failed"), -1; if (flag != 0) return error("Connection is refused"), -1; #endif session->is_connecting = 0; print_sock_info(session->sock); } if (session_send(session) == -1) { if (get_error() != AGAIN && get_error() != WOULDBLOCK) return -1; } return 0; } int getoptions(int argc, char *argv[], struct params *data) { int i; int mask = 0; for (i = 1; i < argc; ++i) { char dummy; if (argv[i][0] != '-') break; if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--host") == 0) { ++i; mask |= 0x8; if (i >= argc || strlen(argv[i]) > sizeof(data->host) / sizeof(*data->host)) return error("Inappropriate argument for \"host\": %s", argv[i]), -1; strncpy(data->host, argv[i], sizeof(data->host) / sizeof(*data->host)); } else if (strcmp(argv[i], "-p") == 0 || strcmp(argv[i], "--port") == 0) { ++i; mask |= 0x10; if (i >= argc || sscanf(argv[i], "%hu%c", &data->port, &dummy) != 1) return error("Inappropriate argument for \"port\": %s", argv[i]), -1; } else if (strcmp(argv[i], "-l") == 0 || strcmp(argv[i], "--login") == 0) { ++i; mask |= 0x20; if (i >= argc || strlen(argv[i]) > sizeof(data->login) / sizeof(*data->login)) return error("Inappropriate argument for \"login\": %s", argv[i]), -1; strncpy(data->login, argv[i], sizeof(data->login) / sizeof(*data->login)); } else if (strcmp(argv[i], "-w") == 0 || strcmp(argv[i], "--password") == 0) { ++i; mask |= 0x40; if (i >= argc || strlen(argv[i]) > sizeof(data->password) / sizeof(*data->password)) return error("Inappropriate argument for \"password\": %s", argv[i]), -1; strncpy(data->password, argv[i], sizeof(data->password) / sizeof(*data->password)); } else if (strcmp(argv[i], "-s") == 0 || strcmp(argv[i], "--seq") == 0) { ++i; if (sscanf(argv[i], "%"PRId64"%c", &data->seq, &dummy) != 1) return error("Inappropriate argument for \"seq\": %s", argv[i]), -1; } else return error("Unknown argument \"%s\"", argv[i]), -1; } if (mask != 0x78) return error("Some arguments are missing"), -1; return 0; } void sig_handler(int signum) { #ifdef _WIN32 if (WSACleanup() == -1) print_error("WSACleanup failed"); #endif } int main(int argc, char *argv[]) { struct params params; struct session session; #ifdef _WIN32 WSADATA wsa_data; #endif memset(¶ms, 0, sizeof(params)); memset(&session, 0, sizeof(session)); if (getoptions(argc, argv, ¶ms) == -1) { fprintf(stderr, "%s (-h|--host) (-p|--port) (-l|--login) (-w|--password) [(-s|--seq) ]", argv[0]); return -1; } #ifdef _WIN32 if (WSAStartup(MAKEWORD(2, 2), &wsa_data) == -1) return print_error("WSAStartup failed"), -1; #endif signal(SIGINT, sig_handler); signal(SIGTERM, sig_handler); if (session_init(&session, ¶ms) == -1) return -1; if (session_enter(&session) == -1) return -1; while (1) { int n_fds = 1; fd_set read_set, write_set, error_set; struct timeval timeval, *timeout; timeout = get_timeout(&timeval); fflush(stdout); FD_ZERO(&read_set); FD_ZERO(&write_set); FD_ZERO(&error_set); if (session.sock == -1 && timeout == NULL) { break; // there is nether socket nor timers to wait #ifdef _WIN32 } else if (session.sock == -1 && timeout != NULL) { Sleep((DWORD)timeout->tv_sec * 1000 + (DWORD)timeout->tv_usec / 1000); n_fds = 0; #endif } else { if (session.sock != -1) { FD_SET(session.sock, &read_set); if (session.write_buffer_size != 0 || session.is_connecting) // check for write only if it needs to FD_SET(session.sock, &write_set); if (session.is_connecting) FD_SET(session.sock, &error_set); n_fds = session.sock + 1; } n_fds = select(n_fds, &read_set, &write_set, &error_set, timeout); } if (n_fds == -1) { print_error("select failed"); break; } else if (n_fds == 0) debug("timeout"); else { if (session.is_connecting) if (FD_ISSET(session.sock, &error_set)) if (session_on_error(&session) == -1) break; if (session.write_buffer_size != 0 || session.is_connecting) if (FD_ISSET(session.sock, &write_set)) if (session_on_write(&session) == -1) break; if (FD_ISSET(session.sock, &read_set)) if (session_on_read(&session) == -1) break; } if (process_timers() == -1) break; } session_destroy(&session); return 0; }