IGNITE-6835 ODBC driver now handles ungraceful TCP disconnects This closes #2997
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/46c480b4 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/46c480b4 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/46c480b4 Branch: refs/heads/ignite-zk Commit: 46c480b46cb6c86ebcc3a94659c689057f9e1464 Parents: 9303845 Author: Igor Sapego <isap...@gridgain.com> Authored: Tue Nov 14 15:48:02 2017 +0300 Committer: Igor Sapego <isap...@gridgain.com> Committed: Tue Nov 14 15:49:04 2017 +0300 ---------------------------------------------------------------------- .../include/ignite/odbc/system/socket_client.h | 20 +- .../odbc/os/linux/src/system/socket_client.cpp | 140 ++++++++++++- .../odbc/os/win/src/system/socket_client.cpp | 194 ++++++++++++++++++- modules/platforms/cpp/odbc/src/connection.cpp | 2 +- 4 files changed, 341 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/46c480b4/modules/platforms/cpp/odbc/include/ignite/odbc/system/socket_client.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/system/socket_client.h b/modules/platforms/cpp/odbc/include/ignite/odbc/system/socket_client.h index ee58927..946605e 100644 --- a/modules/platforms/cpp/odbc/include/ignite/odbc/system/socket_client.h +++ b/modules/platforms/cpp/odbc/include/ignite/odbc/system/socket_client.h @@ -21,6 +21,7 @@ #include <stdint.h> #include "ignite/common/common.h" +#include "ignite/odbc/diagnostic/diagnosable.h" namespace ignite { @@ -34,6 +35,15 @@ namespace ignite class SocketClient { public: + /** Buffers size */ + enum { BUFFER_SIZE = 0x10000 }; + + /** The time in seconds the connection needs to remain idle before starts sending keepalive probes. */ + enum { KEEP_ALIVE_IDLE_TIME = 60 }; + + /** The time in seconds between individual keepalive probes. */ + enum { KEEP_ALIVE_PROBES_PERIOD = 1 }; + /** * Constructor. */ @@ -49,9 +59,10 @@ namespace ignite * * @param hostname Remote host name. * @param port TCP service port. + * @param diag Diagnostics collector. * @return True on success. */ - bool Connect(const char* hostname, uint16_t port); + bool Connect(const char* hostname, uint16_t port, diagnostic::Diagnosable& diag); /** * Close established connection. @@ -73,7 +84,7 @@ namespace ignite /** * Receive data from established connection. * - * @param data Pointer to data buffer. + * @param buffer Pointer to data buffer. * @param size Size of the buffer in bytes. * @return Number of bytes that have been received on success and negative * value on failure. @@ -81,6 +92,11 @@ namespace ignite int Receive(int8_t* buffer, size_t size); private: + /** + * Tries set socket options. + */ + void TrySetOptions(diagnostic::Diagnosable& diag); + intptr_t socketHandle; IGNITE_NO_COPY_ASSIGNMENT(SocketClient) http://git-wip-us.apache.org/repos/asf/ignite/blob/46c480b4/modules/platforms/cpp/odbc/os/linux/src/system/socket_client.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/os/linux/src/system/socket_client.cpp b/modules/platforms/cpp/odbc/os/linux/src/system/socket_client.cpp index 9bdf1d7..5a9b03a 100644 --- a/modules/platforms/cpp/odbc/os/linux/src/system/socket_client.cpp +++ b/modules/platforms/cpp/odbc/os/linux/src/system/socket_client.cpp @@ -17,7 +17,7 @@ #include <sys/socket.h> #include <sys/types.h> -#include <sys/socket.h> +#include <netinet/tcp.h> #include <netdb.h> #include <unistd.h> @@ -31,6 +31,32 @@ #define SOCKET_ERROR (-1) +namespace +{ + /** + * Get last socket error message. + * @return Last socket error message string. + */ + std::string GetLastSocketErrorMessage() + { + int lastError = errno; + std::stringstream res; + + res << "error_code=" << lastError; + + if (lastError == 0) + return res.str(); + + char buffer[1024] = ""; + + strerror_r(lastError, buffer, sizeof(buffer)); + + res << ", msg=" << buffer; + + return res.str(); + } +} + namespace ignite { namespace odbc @@ -48,11 +74,12 @@ namespace ignite Close(); } - bool SocketClient::Connect(const char* hostname, uint16_t port) + bool SocketClient::Connect(const char* hostname, uint16_t port, diagnostic::Diagnosable& diag) { LOG_MSG("Host: " << hostname << ", port: " << port); addrinfo hints; + memset(&hints, 0, sizeof(hints)); hints.ai_family = AF_UNSPEC; hints.ai_socktype = SOCK_STREAM; @@ -66,26 +93,44 @@ namespace ignite int res = getaddrinfo(hostname, converter.str().c_str(), &hints, &result); if (res != 0) + { + LOG_MSG("Address resolving failed: " << gai_strerror(res)); + + diag.AddStatusRecord(SqlState::SHY000_GENERAL_ERROR, "Can not resolve host address."); + return false; + } // Attempt to connect to an address until one succeeds - for (addrinfo *it = result; it != NULL; it = it->ai_next) + for (addrinfo *it = result; it != NULL; it = it->ai_next) { - LOG_MSG("Addr: " << it->ai_addr->sa_data[2] << "." - << it->ai_addr->sa_data[3] << "." - << it->ai_addr->sa_data[4] << "." - << it->ai_addr->sa_data[5]); + LOG_MSG("Addr: " << (it->ai_addr->sa_data[2] & 0xFF) << "." + << (it->ai_addr->sa_data[3] & 0xFF) << "." + << (it->ai_addr->sa_data[4] & 0xFF) << "." + << (it->ai_addr->sa_data[5] & 0xFF)); // Create a SOCKET for connecting to server socketHandle = socket(it->ai_family, it->ai_socktype, it->ai_protocol); if (socketHandle == SOCKET_ERROR) + { + LOG_MSG("Socket creation failed: " << GetLastSocketErrorMessage()); + + diag.AddStatusRecord(SqlState::SHY000_GENERAL_ERROR, "Can not create new socket."); + return false; + } + + diag.GetDiagnosticRecords().Reset(); + + TrySetOptions(diag); // Connect to server. - res = connect(socketHandle, it->ai_addr, (int)it->ai_addrlen); - if (res == SOCKET_ERROR) + res = connect(socketHandle, it->ai_addr, static_cast<int>(it->ai_addrlen)); + if (SOCKET_ERROR == res) { + LOG_MSG("Connection failed: " << GetLastSocketErrorMessage()); + Close(); continue; @@ -117,6 +162,83 @@ namespace ignite { return recv(socketHandle, reinterpret_cast<char*>(buffer), static_cast<int>(size), 0); } + + void SocketClient::TrySetOptions(diagnostic::Diagnosable& diag) + { + int trueOpt = 1; + int bufSizeOpt = BUFFER_SIZE; + int idleOpt = KEEP_ALIVE_IDLE_TIME; + int idleRetryOpt = KEEP_ALIVE_PROBES_PERIOD; + + int res = setsockopt(socketHandle, SOL_SOCKET, SO_SNDBUF, + reinterpret_cast<char*>(&bufSizeOpt), sizeof(bufSizeOpt)); + + if (SOCKET_ERROR == res) + { + LOG_MSG("TCP socket send buffer size setup failed: " << GetLastSocketErrorMessage()); + + diag.AddStatusRecord(SqlState::S01S02_OPTION_VALUE_CHANGED, + "Can not set up TCP socket send buffer size"); + } + + res = setsockopt(socketHandle, SOL_SOCKET, SO_RCVBUF, + reinterpret_cast<char*>(&bufSizeOpt), sizeof(bufSizeOpt)); + + if (SOCKET_ERROR == res) + { + LOG_MSG("TCP socket receive buffer size setup failed: " << GetLastSocketErrorMessage()); + + diag.AddStatusRecord(SqlState::S01S02_OPTION_VALUE_CHANGED, + "Can not set up TCP socket receive buffer size"); + } + + res = setsockopt(socketHandle, IPPROTO_TCP, TCP_NODELAY, + reinterpret_cast<char*>(&trueOpt), sizeof(trueOpt)); + + if (SOCKET_ERROR == res) + { + LOG_MSG("TCP no-delay mode setup failed: " << GetLastSocketErrorMessage()); + + diag.AddStatusRecord(SqlState::S01S02_OPTION_VALUE_CHANGED, + "Can not set up TCP no-delay mode"); + } + + res = setsockopt(socketHandle, SOL_SOCKET, SO_KEEPALIVE, + reinterpret_cast<char*>(&trueOpt), sizeof(trueOpt)); + + if (SOCKET_ERROR == res) + { + LOG_MSG("TCP keep-alive mode setup failed: " << GetLastSocketErrorMessage()); + + diag.AddStatusRecord(SqlState::S01S02_OPTION_VALUE_CHANGED, + "Can not set up TCP keep-alive mode"); + + // There is no sense in configuring keep alive params if we faileed to set up keep alive mode. + return; + } + + res = setsockopt(socketHandle, IPPROTO_TCP, TCP_KEEPIDLE, + reinterpret_cast<char*>(&idleOpt), sizeof(idleOpt)); + + if (SOCKET_ERROR == res) + { + LOG_MSG("TCP keep-alive idle timeout setup failed: " << GetLastSocketErrorMessage()); + + diag.AddStatusRecord(SqlState::S01S02_OPTION_VALUE_CHANGED, + "Can not set up TCP keep-alive idle timeout"); + } + + res = setsockopt(socketHandle, IPPROTO_TCP, TCP_KEEPINTVL, + reinterpret_cast<char*>(&idleRetryOpt), sizeof(idleRetryOpt)); + + if (SOCKET_ERROR == res) + { + LOG_MSG("TCP keep-alive probes period setup failed: " << GetLastSocketErrorMessage()); + + diag.AddStatusRecord(SqlState::S01S02_OPTION_VALUE_CHANGED, + "Can not set up TCP keep-alive probes period"); + } + } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/46c480b4/modules/platforms/cpp/odbc/os/win/src/system/socket_client.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/os/win/src/system/socket_client.cpp b/modules/platforms/cpp/odbc/os/win/src/system/socket_client.cpp index 4c440e2..30fb7d7 100644 --- a/modules/platforms/cpp/odbc/os/win/src/system/socket_client.cpp +++ b/modules/platforms/cpp/odbc/os/win/src/system/socket_client.cpp @@ -21,6 +21,7 @@ #include <windows.h> #include <winsock2.h> #include <ws2tcpip.h> +#include <mstcpip.h> #include <cstring> @@ -30,6 +31,54 @@ #include "ignite/odbc/utility.h" #include "ignite/odbc/log.h" +namespace +{ + /** + * Get last socket error message. + * @return Last socket error message string. + */ + std::string GetLastSocketErrorMessage() + { + HRESULT lastError = WSAGetLastError(); + std::stringstream res; + + res << "error_code=" << lastError; + + if (lastError == 0) + return res.str(); + + LPTSTR errorText = NULL; + + DWORD len = FormatMessage( + // use system message tables to retrieve error text + FORMAT_MESSAGE_FROM_SYSTEM + // allocate buffer on local heap for error text + | FORMAT_MESSAGE_ALLOCATE_BUFFER + // We're not passing insertion parameters + | FORMAT_MESSAGE_IGNORE_INSERTS, + // unused with FORMAT_MESSAGE_FROM_SYSTEM + NULL, + lastError, + MAKELANGID(LANG_ENGLISH, SUBLANG_ENGLISH_US), + // output + reinterpret_cast<LPTSTR>(&errorText), + // minimum size for output buffer + 0, + // arguments - see note + NULL); + + if (NULL != errorText) + { + if (len != 0) + res << ", msg=" << std::string(errorText, len); + + LocalFree(errorText); + } + + return res.str(); + } +} + namespace ignite { namespace odbc @@ -47,7 +96,7 @@ namespace ignite Close(); } - bool SocketClient::Connect(const char* hostname, uint16_t port) + bool SocketClient::Connect(const char* hostname, uint16_t port, diagnostic::Diagnosable& diag) { static bool networkInited = false; @@ -59,10 +108,15 @@ namespace ignite networkInited = (WSAStartup(MAKEWORD(2, 2), &wsaData) == 0); if (!networkInited) + { + LOG_MSG("Networking initialisation failed: " << GetLastSocketErrorMessage()); + + diag.AddStatusRecord(SqlState::SHY000_GENERAL_ERROR, "Can not initialize Windows networking."); + return false; + } } - addrinfo *result = NULL; addrinfo hints; LOG_MSG("Host: " << hostname << " port: " << port); @@ -76,10 +130,17 @@ namespace ignite converter << port; // Resolve the server address and port + addrinfo *result = NULL; int res = getaddrinfo(hostname, converter.str().c_str(), &hints, &result); if (res != 0) + { + LOG_MSG("Address resolving failed: " << GetLastSocketErrorMessage()); + + diag.AddStatusRecord(SqlState::SHY000_GENERAL_ERROR, "Can not resolve host address."); + return false; + } // Attempt to connect to an address until one succeeds for (addrinfo *it = result; it != NULL; it = it->ai_next) @@ -93,12 +154,24 @@ namespace ignite socketHandle = socket(it->ai_family, it->ai_socktype, it->ai_protocol); if (socketHandle == INVALID_SOCKET) + { + LOG_MSG("Socket creation failed: " << GetLastSocketErrorMessage()); + + diag.AddStatusRecord(SqlState::SHY000_GENERAL_ERROR, "Can not create new socket."); + return false; + } + + diag.GetDiagnosticRecords().Reset(); + + TrySetOptions(diag); // Connect to server. res = connect(socketHandle, it->ai_addr, static_cast<int>(it->ai_addrlen)); - if (res == SOCKET_ERROR) + if (SOCKET_ERROR == res) { + LOG_MSG("Connection failed: " << GetLastSocketErrorMessage()); + Close(); continue; @@ -130,6 +203,121 @@ namespace ignite { return recv(socketHandle, reinterpret_cast<char*>(buffer), static_cast<int>(size), 0); } + + void SocketClient::TrySetOptions(diagnostic::Diagnosable& diag) + { + BOOL trueOpt = TRUE; + int bufSizeOpt = BUFFER_SIZE; + + int res = setsockopt(socketHandle, SOL_SOCKET, SO_SNDBUF, + reinterpret_cast<char*>(&bufSizeOpt), sizeof(bufSizeOpt)); + + if (SOCKET_ERROR == res) + { + LOG_MSG("TCP socket send buffer size setup failed: " << GetLastSocketErrorMessage()); + + diag.AddStatusRecord(SqlState::S01S02_OPTION_VALUE_CHANGED, + "Can not set up TCP socket send buffer size"); + } + + res = setsockopt(socketHandle, SOL_SOCKET, SO_RCVBUF, + reinterpret_cast<char*>(&bufSizeOpt), sizeof(bufSizeOpt)); + + if (SOCKET_ERROR == res) + { + LOG_MSG("TCP socket receive buffer size setup failed: " << GetLastSocketErrorMessage()); + + diag.AddStatusRecord(SqlState::S01S02_OPTION_VALUE_CHANGED, + "Can not set up TCP socket receive buffer size"); + } + + res = setsockopt(socketHandle, IPPROTO_TCP, TCP_NODELAY, + reinterpret_cast<char*>(&trueOpt), sizeof(trueOpt)); + + if (SOCKET_ERROR == res) + { + LOG_MSG("TCP no-delay mode setup failed: " << GetLastSocketErrorMessage()); + + diag.AddStatusRecord(SqlState::S01S02_OPTION_VALUE_CHANGED, + "Can not set up TCP no-delay mode"); + } + + res = setsockopt(socketHandle, SOL_SOCKET, SO_KEEPALIVE, + reinterpret_cast<char*>(&trueOpt), sizeof(trueOpt)); + + if (SOCKET_ERROR == res) + { + LOG_MSG("TCP keep-alive mode setup failed: " << GetLastSocketErrorMessage()); + + diag.AddStatusRecord(SqlState::S01S02_OPTION_VALUE_CHANGED, + "Can not set up TCP keep-alive mode"); + + // There is no sense in configuring keep alive params if we faileed to set up keep alive mode. + return; + } + + // This option is available starting with Windows 10, version 1709. +#if defined(TCP_KEEPIDLE) && defined(TCP_KEEPINTVL) + DWORD idleOpt = KEEP_ALIVE_IDLE_TIME; + DWORD idleRetryOpt = KEEP_ALIVE_PROBES_PERIOD; + + res = setsockopt(socketHandle, IPPROTO_TCP, TCP_KEEPIDLE, + reinterpret_cast<char*>(&idleOpt), sizeof(idleOpt)); + + if (SOCKET_ERROR == res) + { + LOG_MSG("TCP keep-alive idle timeout setup failed: " << GetLastSocketErrorMessage()); + + diag.AddStatusRecord(SqlState::S01S02_OPTION_VALUE_CHANGED, + "Can not set up TCP keep-alive idle timeout"); + } + + res = setsockopt(socketHandle, IPPROTO_TCP, TCP_KEEPINTVL, + reinterpret_cast<char*>(&idleRetryOpt), sizeof(idleRetryOpt)); + + if (SOCKET_ERROR == res) + { + LOG_MSG("TCP keep-alive probes period setup failed: " << GetLastSocketErrorMessage()); + + diag.AddStatusRecord(SqlState::S01S02_OPTION_VALUE_CHANGED, + "Can not set up TCP keep-alive probes period"); + } +#else // use old hardcore WSAIoctl + + // WinSock structure for KeepAlive timing settings + struct tcp_keepalive settings = {0}; + settings.onoff = 1; + settings.keepalivetime = KEEP_ALIVE_IDLE_TIME * 1000; + settings.keepaliveinterval = KEEP_ALIVE_PROBES_PERIOD * 1000; + + // pointers for WinSock call + DWORD bytesReturned; + WSAOVERLAPPED overlapped; + overlapped.hEvent = NULL; + + // Set KeepAlive settings + res = WSAIoctl( + socketHandle, + SIO_KEEPALIVE_VALS, + &settings, + sizeof(struct tcp_keepalive), + NULL, + 0, + &bytesReturned, + &overlapped, + NULL + ); + + if (SOCKET_ERROR == res) + { + LOG_MSG("TCP keep-alive params setup failed: " << GetLastSocketErrorMessage()); + + diag.AddStatusRecord(SqlState::S01S02_OPTION_VALUE_CHANGED, + "Can not set up TCP keep-alive idle timeout and probes period"); + } +#endif + } + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/46c480b4/modules/platforms/cpp/odbc/src/connection.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/src/connection.cpp b/modules/platforms/cpp/odbc/src/connection.cpp index 8f4bf14..b99d768 100644 --- a/modules/platforms/cpp/odbc/src/connection.cpp +++ b/modules/platforms/cpp/odbc/src/connection.cpp @@ -126,7 +126,7 @@ namespace ignite return SqlResult::AI_ERROR; } - connected = socket.Connect(cfg.GetHost().c_str(), cfg.GetTcpPort()); + connected = socket.Connect(cfg.GetHost().c_str(), cfg.GetTcpPort(), *this); if (!connected) {