[ https://issues.apache.org/jira/browse/TRAFODION-3334?focusedWorklogId=386254&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-386254 ]
ASF GitHub Bot logged work on TRAFODION-3334: --------------------------------------------- Author: ASF GitHub Bot Created on: 12/Feb/20 21:44 Start Date: 12/Feb/20 21:44 Worklog Time Spent: 10m Work Description: narendragoyal commented on pull request #1869: [TRAFODION-3334] Refactored and re-implemented monitor communication. URL: https://github.com/apache/trafodion/pull/1869#discussion_r377995049 ########## File path: core/sqf/monitor/linux/comm.cxx ########## @@ -0,0 +1,1757 @@ +/////////////////////////////////////////////////////////////////////////////// +// +// @@@ START COPYRIGHT @@@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// @@@ END COPYRIGHT @@@ +// +/////////////////////////////////////////////////////////////////////////////// + +#include <iostream> + +using namespace std; + +#include <errno.h> +#include <limits.h> +#include <stdio.h> +#include <stdlib.h> +#include <netdb.h> +#include <unistd.h> +#include <sys/epoll.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <netinet/tcp.h> + +#include "monlogging.h" +#include "montrace.h" +#include "comm.h" + +const char *EpollEventString( __uint32_t events ); +const char *EpollOpString( int op ); + +CComm::CComm( void ) + :epollFd_(-1) +{ + const char method_name[] = "CComm::CComm"; + TRACE_ENTRY; + + // Add eyecatcher sequence as a debugging aid + memcpy(&eyecatcher_, "COMM", 4); + + epollFd_ = epoll_create1( EPOLL_CLOEXEC ); + if ( epollFd_ < 0 ) + { + char ebuff[256]; + char buf[MON_STRING_BUF_SIZE]; + snprintf( buf, sizeof(buf), "[%s@%d] epoll_create1(sendrecv) error: %s\n", + method_name, __LINE__, strerror_r( errno, ebuff, 256 ) ); + mon_log_write( COMM_COMM_1, SQ_LOG_CRIT, buf ); + + mon_failure_exit(); + } + + TRACE_EXIT; +} + +CComm::~CComm( void ) +{ + const char method_name[] = "CComm::~CComm"; + TRACE_ENTRY; + + if (epollFd_ != -1) + { + close( epollFd_ ); + } + + // Alter eyecatcher sequence as a debugging aid to identify deleted object + memcpy(&eyecatcher_, "comm", 4); + + TRACE_EXIT; +} + +int CComm::Accept( int listenSock ) +{ + const char method_name[] = "CComm::Accept"; + TRACE_ENTRY; + +#if defined(_XOPEN_SOURCE_EXTENDED) +#ifdef __LP64__ + socklen_t size; // size of socket address +#else + size_t size; // size of socket address +#endif +#else + int size; // size of socket address +#endif + int csock; // connected socket + struct sockaddr_in sockinfo; // socket address info + + size = sizeof(struct sockaddr *); + if ( getsockname( listenSock, (struct sockaddr *) &sockinfo, &size ) ) + { + char buf[MON_STRING_BUF_SIZE]; + int err = errno; + snprintf(buf, sizeof(buf), "[%s], getsockname() failed, errno=%d (%s).\n", + method_name, err, strerror(err)); + mon_log_write(COMM_ACCEPT_1, SQ_LOG_ERR, buf); + return ( -1 ); + } + + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) + { + unsigned char *addrp = (unsigned char *) &sockinfo.sin_addr.s_addr; + trace_printf( "%s@%d - Accepting socket on addr=%d.%d.%d.%d, port=%d\n" + , method_name, __LINE__ + , addrp[0] + , addrp[1] + , addrp[2] + , addrp[3] + , (int) ntohs( sockinfo.sin_port ) ); + } + + while ( ((csock = accept( listenSock + , (struct sockaddr *) 0 + , (socklen_t *) 0 ) ) < 0) && (errno == EINTR) ); + + if ( csock > 0 ) + { + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) + { + unsigned char *addrp = (unsigned char *) &sockinfo.sin_addr.s_addr; + trace_printf( "%s@%d - Accepted socket on addr=%d.%d.%d.%d, " + "port=%d, listenSock=%d, csock=%d\n" + , method_name, __LINE__ + , addrp[0] + , addrp[1] + , addrp[2] + , addrp[3] + , (int) ntohs( sockinfo.sin_port ) + , listenSock + , csock ); + } + + int nodelay = 1; + if ( setsockopt( csock + , IPPROTO_TCP + , TCP_NODELAY + , (char *) &nodelay + , sizeof(int) ) ) + { + char buf[MON_STRING_BUF_SIZE]; + int err = errno; + snprintf(buf, sizeof(buf), "[%s], setsockopt() failed, errno=%d (%s).\n", + method_name, err, strerror(err)); + mon_log_write(COMM_ACCEPT_2, SQ_LOG_ERR, buf); + return ( -2 ); + } + + int reuse = 1; + if ( setsockopt( csock + , SOL_SOCKET + , SO_REUSEADDR + , (char *) &reuse + , sizeof(int) ) ) + { + char buf[MON_STRING_BUF_SIZE]; + int err = errno; + snprintf(buf, sizeof(buf), "[%s], setsockopt() failed, errno=%d (%s).\n", + method_name, err, strerror(err)); + mon_log_write(COMM_ACCEPT_3, SQ_LOG_ERR, buf); + return ( -2 ); + } + } + + TRACE_EXIT; + return ( csock ); +} + +void CComm::ConnectLocal( int port ) +{ + const char method_name[] = "CComm::ConnectLocal"; + TRACE_ENTRY; + + int sock; // socket + int ret; // returned value +#if defined(_XOPEN_SOURCE_EXTENDED) +#ifdef __LP64__ + socklen_t size; // size of socket address +#else + size_t size; // size of socket address +#endif +#else + int size; // size of socket address +#endif + static int retries = 0; // # times to retry connect + int connect_failures = 0; // # failed connects + char *p; // getenv results + struct sockaddr_in sockinfo; // socket address info + struct hostent *he; + + size = sizeof(sockinfo); + + if ( !retries ) + { + p = getenv( "HPMP_CONNECT_RETRIES" ); + if ( p ) retries = atoi( p ); + else retries = 5; + } + + sock = socket( AF_INET, SOCK_STREAM, 0 ); + if ( sock < 0 ) + { + char la_buf[MON_STRING_BUF_SIZE]; + int err = errno; + sprintf( la_buf, "[%s], socket() failed! errno=%d (%s)\n" + , method_name, err, strerror( err )); + mon_log_write( COMM_CONNECTLOCAL_1, SQ_LOG_CRIT, la_buf ); + + mon_failure_exit(); + } + + he = gethostbyname( "localhost" ); + if ( !he ) + { + char ebuff[256]; + char buf[MON_STRING_BUF_SIZE]; + snprintf( buf, sizeof(buf), "[%s@%d] gethostbyname(%s) error: %s\n", + method_name, __LINE__, "localhost", strerror_r( h_errno, ebuff, 256 ) ); + mon_log_write( COMM_CONNECTLOCAL_2, SQ_LOG_CRIT, buf ); + + mon_failure_exit(); + } + + // Connect socket. + memset( (char *) &sockinfo, 0, size ); + memcpy( (char *) &sockinfo.sin_addr, (char *) he->h_addr, 4 ); + sockinfo.sin_family = AF_INET; + sockinfo.sin_port = htons( (unsigned short) port ); + + connect_failures = 0; + ret = 1; + while ( ret != 0 && connect_failures <= 10 ) + { + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) + { + trace_printf( "%s@%d - Connecting to localhost addr=%d.%d.%d.%d, port=%d, connect_failures=%d\n" + , method_name, __LINE__ + , (int)((unsigned char *)he->h_addr)[0] + , (int)((unsigned char *)he->h_addr)[1] + , (int)((unsigned char *)he->h_addr)[2] + , (int)((unsigned char *)he->h_addr)[3] + , port + , connect_failures ); + } + + ret = connect( sock, (struct sockaddr *) &sockinfo, size ); + if ( ret == 0 ) break; + if ( errno == EINTR ) + { + ++connect_failures; + } + else + { + char la_buf[MON_STRING_BUF_SIZE]; + int err = errno; + sprintf( la_buf, "[%s], connect() failed! errno=%d (%s)\n" + , method_name, err, strerror( err )); + mon_log_write(COMM_CONNECTLOCAL_3, SQ_LOG_CRIT, la_buf); + + mon_failure_exit(); + } + } + + close( sock ); + + TRACE_EXIT; +} + +int CComm::Connect( const char *portName, bool doRetries ) +{ + const char method_name[] = "CComm::Connect"; + TRACE_ENTRY; + + int sock; // socket + int ret; // returned value + int nodelay = 1; // sockopt reuse option Review comment: typo in the comment - not a 'reuse' option :) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 386254) Time Spent: 2h 50m (was: 2h 40m) > Communication IO between monitor processes must use timeouts and retries > ------------------------------------------------------------------------ > > Key: TRAFODION-3334 > URL: https://issues.apache.org/jira/browse/TRAFODION-3334 > Project: Apache Trafodion > Issue Type: Bug > Components: foundation > Affects Versions: 2.4 > Reporter: Gonzalo E Correa > Priority: Major > Fix For: 2.4 > > Time Spent: 2h 50m > Remaining Estimate: 0h > > Most communication channels used by monitor processes to exchange cluster > state information and to handle failure detection must be changed to > asynchronous IO with timeouts and retries to allow for the removal of a > monitor process from the cluster communication. This is to prevent a 'Sync > Thread Timeout' failure of the entire cluster instance where a monitor > process or it host server becomes unresponsive due to a server or network > failure. -- This message was sent by Atlassian Jira (v8.3.4#803005)