Repository: thrift Updated Branches: refs/heads/master 02fbe0ecc -> 859a40cf5
THRIFT-4515: cross server test improvement: graceful test server shutdown This closes #1509 Project: http://git-wip-us.apache.org/repos/asf/thrift/repo Commit: http://git-wip-us.apache.org/repos/asf/thrift/commit/9bea32f7 Tree: http://git-wip-us.apache.org/repos/asf/thrift/tree/9bea32f7 Diff: http://git-wip-us.apache.org/repos/asf/thrift/diff/9bea32f7 Branch: refs/heads/master Commit: 9bea32f73c36a8f53a45e818cfafe81b6fefefae Parents: 02fbe0e Author: James E. King III <jk...@apache.org> Authored: Fri Mar 16 16:07:42 2018 -0400 Committer: James E. King III <jk...@apache.org> Committed: Mon Mar 19 14:38:49 2018 -0400 ---------------------------------------------------------------------- build/cmake/ConfigureChecks.cmake | 1 + build/cmake/config.h.in | 3 + build/docker/scripts/autotools.sh | 4 - configure.ac | 1 + lib/cpp/src/thrift/TOutput.cpp | 2 +- .../src/thrift/server/TNonblockingServer.cpp | 2 + lib/cpp/src/thrift/transport/TSSLSocket.cpp | 8 + lib/cpp/src/thrift/transport/TSocket.cpp | 12 +- lib/cpp/test/concurrency/TimerManagerTests.h | 34 ++- lib/d/test/thrift_test_server.d | 58 +++++- lib/perl/lib/Thrift/Server.pm | 38 ++-- lib/perl/lib/Thrift/ServerSocket.pm | 15 +- test/cpp/src/TestServer.cpp | 51 +++-- test/crossrunner/collect.py | 4 +- test/crossrunner/report.py | 10 +- test/crossrunner/run.py | 206 +++++++++++-------- test/crossrunner/test.py | 10 +- test/crossrunner/util.py | 4 + test/perl/TestServer.pl | 8 + test/test.py | 21 +- test/tests.json | 13 +- 21 files changed, 332 insertions(+), 173 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/thrift/blob/9bea32f7/build/cmake/ConfigureChecks.cmake ---------------------------------------------------------------------- diff --git a/build/cmake/ConfigureChecks.cmake b/build/cmake/ConfigureChecks.cmake index e4793d4..6b9c6a3 100644 --- a/build/cmake/ConfigureChecks.cmake +++ b/build/cmake/ConfigureChecks.cmake @@ -34,6 +34,7 @@ check_include_file(getopt.h HAVE_GETOPT_H) check_include_file(inttypes.h HAVE_INTTYPES_H) check_include_file(netdb.h HAVE_NETDB_H) check_include_file(netinet/in.h HAVE_NETINET_IN_H) +check_include_file(signal.h HAVE_SIGNAL_H) check_include_file(stdint.h HAVE_STDINT_H) check_include_file(unistd.h HAVE_UNISTD_H) check_include_file(pthread.h HAVE_PTHREAD_H) http://git-wip-us.apache.org/repos/asf/thrift/blob/9bea32f7/build/cmake/config.h.in ---------------------------------------------------------------------- diff --git a/build/cmake/config.h.in b/build/cmake/config.h.in index 21561b2..c5d4d30 100644 --- a/build/cmake/config.h.in +++ b/build/cmake/config.h.in @@ -91,6 +91,9 @@ /* Define to 1 if you have the <netinet/in.h> header file. */ #cmakedefine HAVE_NETINET_IN_H 1 +/* Define to 1 if you have the <signal.h> header file. */ +#cmakedefine HAVE_SIGNAL_H 1 + /* Define to 1 if you have the <stdint.h> header file. */ #cmakedefine HAVE_STDINT_H 1 http://git-wip-us.apache.org/repos/asf/thrift/blob/9bea32f7/build/docker/scripts/autotools.sh ---------------------------------------------------------------------- diff --git a/build/docker/scripts/autotools.sh b/build/docker/scripts/autotools.sh index 67e4f2d..8388f72 100755 --- a/build/docker/scripts/autotools.sh +++ b/build/docker/scripts/autotools.sh @@ -1,10 +1,6 @@ #!/bin/sh set -ev -# haxe hxcpp > 3.4.188 will enable c++11 by default, and break the -# build when compiling C files with clang++ by adding -std=c++11 -export HXCPP_NO_CPP11=1 - ./bootstrap.sh ./configure $* make check -j3 http://git-wip-us.apache.org/repos/asf/thrift/blob/9bea32f7/configure.ac ---------------------------------------------------------------------- diff --git a/configure.ac b/configure.ac index 9efc28b..917f6fa 100755 --- a/configure.ac +++ b/configure.ac @@ -633,6 +633,7 @@ AC_CHECK_HEADERS([limits.h]) AC_CHECK_HEADERS([netdb.h]) AC_CHECK_HEADERS([netinet/in.h]) AC_CHECK_HEADERS([pthread.h]) +AC_CHECK_HEADERS([signal.h]) AC_CHECK_HEADERS([stddef.h]) AC_CHECK_HEADERS([stdlib.h]) AC_CHECK_HEADERS([sys/ioctl.h]) http://git-wip-us.apache.org/repos/asf/thrift/blob/9bea32f7/lib/cpp/src/thrift/TOutput.cpp ---------------------------------------------------------------------- diff --git a/lib/cpp/src/thrift/TOutput.cpp b/lib/cpp/src/thrift/TOutput.cpp index bb46263..ae3a9e2 100644 --- a/lib/cpp/src/thrift/TOutput.cpp +++ b/lib/cpp/src/thrift/TOutput.cpp @@ -94,7 +94,7 @@ void TOutput::errorTimeWrapper(const char* msg) { } void TOutput::perror(const char* message, int errno_copy) { - std::string out = message + strerror_s(errno_copy); + std::string out = message + std::string(": ") + strerror_s(errno_copy); f_(out.c_str()); } http://git-wip-us.apache.org/repos/asf/thrift/blob/9bea32f7/lib/cpp/src/thrift/server/TNonblockingServer.cpp ---------------------------------------------------------------------- diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.cpp b/lib/cpp/src/thrift/server/TNonblockingServer.cpp index e60bffc..f89b5f7 100644 --- a/lib/cpp/src/thrift/server/TNonblockingServer.cpp +++ b/lib/cpp/src/thrift/server/TNonblockingServer.cpp @@ -659,6 +659,7 @@ void TNonblockingServer::TConnection::transition() { return; } } + // fallthrough // Intentionally fall through here, the call to process has written into // the writeBuffer_ @@ -707,6 +708,7 @@ void TNonblockingServer::TConnection::transition() { server_->getIdleWriteBufferLimit()); callsForResize_ = 0; } + // fallthrough // N.B.: We also intentionally fall through here into the INIT state! http://git-wip-us.apache.org/repos/asf/thrift/blob/9bea32f7/lib/cpp/src/thrift/transport/TSSLSocket.cpp ---------------------------------------------------------------------- diff --git a/lib/cpp/src/thrift/transport/TSSLSocket.cpp b/lib/cpp/src/thrift/transport/TSSLSocket.cpp index 7bdacb0..7071698 100644 --- a/lib/cpp/src/thrift/transport/TSSLSocket.cpp +++ b/lib/cpp/src/thrift/transport/TSSLSocket.cpp @@ -304,6 +304,7 @@ bool TSSLSocket::peek() { && (errno_copy != THRIFT_EAGAIN)) { break; } + // fallthrough case SSL_ERROR_WANT_READ: case SSL_ERROR_WANT_WRITE: // in the case of SSL_ERROR_SYSCALL we want to wait for an read event again @@ -350,6 +351,7 @@ void TSSLSocket::close() { && (errno_copy != THRIFT_EAGAIN)) { break; } + // fallthrough case SSL_ERROR_WANT_READ: case SSL_ERROR_WANT_WRITE: // in the case of SSL_ERROR_SYSCALL we want to wait for an write/read event again @@ -415,6 +417,8 @@ uint32_t TSSLSocket::read(uint8_t* buf, uint32_t len) { // a certain number break; } + // fallthrough + case SSL_ERROR_WANT_READ: case SSL_ERROR_WANT_WRITE: if (isLibeventSafe()) { @@ -471,6 +475,7 @@ void TSSLSocket::write(const uint8_t* buf, uint32_t len) { && (errno_copy != THRIFT_EAGAIN)) { break; } + // fallthrough case SSL_ERROR_WANT_READ: case SSL_ERROR_WANT_WRITE: if (isLibeventSafe()) { @@ -515,6 +520,7 @@ uint32_t TSSLSocket::write_partial(const uint8_t* buf, uint32_t len) { && (errno_copy != THRIFT_EAGAIN)) { break; } + // fallthrough case SSL_ERROR_WANT_READ: case SSL_ERROR_WANT_WRITE: if (isLibeventSafe()) { @@ -602,6 +608,7 @@ void TSSLSocket::initializeHandshake() { && (errno_copy != THRIFT_EAGAIN)) { break; } + // fallthrough case SSL_ERROR_WANT_READ: case SSL_ERROR_WANT_WRITE: if (isLibeventSafe()) { @@ -634,6 +641,7 @@ void TSSLSocket::initializeHandshake() { && (errno_copy != THRIFT_EAGAIN)) { break; } + // fallthrough case SSL_ERROR_WANT_READ: case SSL_ERROR_WANT_WRITE: if (isLibeventSafe()) { http://git-wip-us.apache.org/repos/asf/thrift/blob/9bea32f7/lib/cpp/src/thrift/transport/TSocket.cpp ---------------------------------------------------------------------- diff --git a/lib/cpp/src/thrift/transport/TSocket.cpp b/lib/cpp/src/thrift/transport/TSocket.cpp index c90593d..18cadbc 100644 --- a/lib/cpp/src/thrift/transport/TSocket.cpp +++ b/lib/cpp/src/thrift/transport/TSocket.cpp @@ -809,11 +809,15 @@ void TSocket::setMaxRecvRetries(int maxRecvRetries) { string TSocket::getSocketInfo() { std::ostringstream oss; - if (host_.empty() || port_ == 0) { - oss << "<Host: " << getPeerAddress(); - oss << " Port: " << getPeerPort() << ">"; + if (path_.empty()) { + if (host_.empty() || port_ == 0) { + oss << "<Host: " << getPeerAddress(); + oss << " Port: " << getPeerPort() << ">"; + } else { + oss << "<Host: " << host_ << " Port: " << port_ << ">"; + } } else { - oss << "<Host: " << host_ << " Port: " << port_ << ">"; + oss << "<Path: " << path_ << ">"; } return oss.str(); } http://git-wip-us.apache.org/repos/asf/thrift/blob/9bea32f7/lib/cpp/test/concurrency/TimerManagerTests.h ---------------------------------------------------------------------- diff --git a/lib/cpp/test/concurrency/TimerManagerTests.h b/lib/cpp/test/concurrency/TimerManagerTests.h index 3779b0d..1c52c47 100644 --- a/lib/cpp/test/concurrency/TimerManagerTests.h +++ b/lib/cpp/test/concurrency/TimerManagerTests.h @@ -79,14 +79,13 @@ public: = shared_ptr<TimerManagerTests::Task>(new TimerManagerTests::Task(_monitor, 10 * timeout)); { - TimerManager timerManager; - timerManager.threadFactory(shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory())); - timerManager.start(); - - assert(timerManager.state() == TimerManager::STARTED); + if (timerManager.state() != TimerManager::STARTED) { + std::cerr << "timerManager is not in the STARTED state, but should be" << std::endl; + return false; + } // Don't create task yet, because its constructor sets the expected completion time, and we // need to delay between inserting the two tasks into the run queue. @@ -94,34 +93,27 @@ public: { Synchronized s(_monitor); - timerManager.add(orphanTask, 10 * timeout); - try { - // Wait for 1 second in order to give timerManager a chance to start sleeping in response - // to adding orphanTask. We need to do this so we can verify that adding the second task - // kicks the dispatcher out of the current wait and starts the new 1 second wait. - _monitor.wait(1000); - assert( - 0 == "ERROR: This wait should time out. TimerManager dispatcher may have a problem."); - } catch (TimedOutException&) { - } + THRIFT_SLEEP_USEC(timeout * 1000); task.reset(new TimerManagerTests::Task(_monitor, timeout)); - timerManager.add(task, timeout); - _monitor.wait(); } - assert(task->_done); + if (!task->_done) { + std::cerr << "task is not done, but it should have executed" << std::endl; + return false; + } std::cout << "\t\t\t" << (task->_success ? "Success" : "Failure") << "!" << std::endl; } - // timerManager.stop(); This is where it happens via destructor - - assert(!orphanTask->_done); + if (orphanTask->_done) { + std::cerr << "orphan task is done, but it should not have executed" << std::endl; + return false; + } return true; } http://git-wip-us.apache.org/repos/asf/thrift/blob/9bea32f7/lib/d/test/thrift_test_server.d ---------------------------------------------------------------------- diff --git a/lib/d/test/thrift_test_server.d b/lib/d/test/thrift_test_server.d index 71ab917..b582253 100644 --- a/lib/d/test/thrift_test_server.d +++ b/lib/d/test/thrift_test_server.d @@ -16,8 +16,11 @@ * specific language governing permissions and limitations * under the License. */ + module thrift_test_server; +import core.stdc.errno : errno; +import core.stdc.signal : signal, sigfn_t, SIGINT, SIG_DFL, SIG_ERR; import core.thread : dur, Thread; import std.algorithm; import std.exception : enforce; @@ -40,6 +43,7 @@ import thrift.transport.buffered; import thrift.transport.framed; import thrift.transport.http; import thrift.transport.ssl; +import thrift.util.cancellation; import thrift.util.hashset; import test_utils; @@ -205,14 +209,44 @@ private: bool trace_; } +shared(bool) gShutdown = false; + +nothrow @nogc extern(C) void handleSignal(int sig) { + gShutdown = true; +} + +// Runs a thread that waits for shutdown to be +// signaled and then triggers cancellation, +// causing the server to stop. While we could +// use a signalfd for this purpose, we are instead +// opting for a busy waiting scheme for maximum +// portability since signalfd is a linux thing. + +class ShutdownThread : Thread { + this(TCancellationOrigin cancellation) { + cancellation_ = cancellation; + super(&run); + } + +private: + void run() { + while (!gShutdown) { + Thread.sleep(dur!("msecs")(25)); + } + cancellation_.trigger(); + } + + TCancellationOrigin cancellation_; +} + void main(string[] args) { ushort port = 9090; ServerType serverType; ProtocolType protocolType; size_t numIOThreads = 1; TransportType transportType; - bool ssl; - bool trace; + bool ssl = false; + bool trace = true; size_t taskPoolSize = totalCPUs; getopt(args, "port", &port, "protocol", &protocolType, "server-type", @@ -279,8 +313,26 @@ void main(string[] args) { auto server = createServer(serverType, numIOThreads, taskPoolSize, processor, serverSocket, transportFactory, protocolFactory); + // Set up SIGINT signal handling + sigfn_t oldHandler = signal(SIGINT, &handleSignal); + enforce(oldHandler != SIG_ERR, + "Could not replace the SIGINT signal handler: errno {0}".format(errno())); + + // Set up a server cancellation trigger + auto cancel = new TCancellationOrigin(); + + // Set up a listener for the shutdown condition - this will + // wake up when the signal occurs and trigger cancellation. + auto shutdown = new ShutdownThread(cancel); + shutdown.start(); + + // Serve from this thread; the signal will stop the server + // and control will return here writefln("Starting %s/%s %s ThriftTest server %son port %s...", protocolType, transportType, serverType, ssl ? "(using SSL) ": "", port); - server.serve(); + server.serve(cancel); + shutdown.join(); + signal(SIGINT, SIG_DFL); + writeln("done."); } http://git-wip-us.apache.org/repos/asf/thrift/blob/9bea32f7/lib/perl/lib/Thrift/Server.pm ---------------------------------------------------------------------- diff --git a/lib/perl/lib/Thrift/Server.pm b/lib/perl/lib/Thrift/Server.pm index fc9ca30..f265d45 100644 --- a/lib/perl/lib/Thrift/Server.pm +++ b/lib/perl/lib/Thrift/Server.pm @@ -150,27 +150,31 @@ sub new sub serve { my $self = shift; - + my $stop = 0; + $self->{serverTransport}->listen(); - while (1) - { + while (!$stop) { my $client = $self->{serverTransport}->accept(); - my $itrans = $self->{inputTransportFactory}->getTransport($client); - my $otrans = $self->{outputTransportFactory}->getTransport($client); - my $iprot = $self->{inputProtocolFactory}->getProtocol($itrans); - my $oprot = $self->{outputProtocolFactory}->getProtocol($otrans); - eval { - $self->_clientBegin($iprot, $oprot); - while (1) - { - $self->{processor}->process($iprot, $oprot); + if (defined $client) { + my $itrans = $self->{inputTransportFactory}->getTransport($client); + my $otrans = $self->{outputTransportFactory}->getTransport($client); + my $iprot = $self->{inputProtocolFactory}->getProtocol($itrans); + my $oprot = $self->{outputProtocolFactory}->getProtocol($otrans); + eval { + $self->_clientBegin($iprot, $oprot); + while (1) + { + $self->{processor}->process($iprot, $oprot); + } + }; if($@) { + $self->_handleException($@); } - }; if($@) { - $self->_handleException($@); - } - $itrans->close(); - $otrans->close(); + $itrans->close(); + $otrans->close(); + } else { + $stop = 1; + } } } http://git-wip-us.apache.org/repos/asf/thrift/blob/9bea32f7/lib/perl/lib/Thrift/ServerSocket.pm ---------------------------------------------------------------------- diff --git a/lib/perl/lib/Thrift/ServerSocket.pm b/lib/perl/lib/Thrift/ServerSocket.pm index 51f83b4..2c4d906 100644 --- a/lib/perl/lib/Thrift/ServerSocket.pm +++ b/lib/perl/lib/Thrift/ServerSocket.pm @@ -81,15 +81,24 @@ sub accept { my $self = shift; - if ( exists $self->{handle} and defined $self->{handle} ) - { + if ( exists $self->{handle} and defined $self->{handle} ) { my $client = $self->{handle}->accept(); my $result = $self->__client(); $result->{handle} = new IO::Select($client); return $result; } - return 0; + return undef; +} + +sub close +{ + my $self = shift; + + if ( exists $self->{handle} and defined $self->{handle} ) + { + $self->{handle}->close(); + } } ### http://git-wip-us.apache.org/repos/asf/thrift/blob/9bea32f7/test/cpp/src/TestServer.cpp ---------------------------------------------------------------------- diff --git a/test/cpp/src/TestServer.cpp b/test/cpp/src/TestServer.cpp index b93e5ea..1c38124 100644 --- a/test/cpp/src/TestServer.cpp +++ b/test/cpp/src/TestServer.cpp @@ -49,6 +49,9 @@ #ifdef HAVE_INTTYPES_H #include <inttypes.h> #endif +#ifdef HAVE_SIGNAL_H +#include <signal.h> +#endif #include <iostream> #include <stdexcept> @@ -59,7 +62,6 @@ #include <boost/filesystem.hpp> #include <thrift/stdcxx.h> -#include <signal.h> #if _WIN32 #include <thrift/windows/TWinsockSingleton.h> #endif @@ -75,6 +77,17 @@ using namespace apache::thrift::server; using namespace thrift::test; +// to handle a controlled shutdown, signal handling is mandatory +#ifdef HAVE_SIGNAL_H +apache::thrift::concurrency::Monitor gMonitor; +void signal_handler(int signum) +{ + if (signum == SIGINT) { + gMonitor.notifyAll(); + } +} +#endif + class TestHandler : public ThriftTestIf { public: TestHandler() {} @@ -635,6 +648,12 @@ int main(int argc, char** argv) { ssl = true; } +#if defined(HAVE_SIGNAL_H) && defined(SIGPIPE) + if (ssl) { + signal(SIGPIPE, SIG_IGN); // for OpenSSL, otherwise we end abruptly + } +#endif + if (vm.count("abstract-namespace")) { abstract_namespace = true; } @@ -770,14 +789,14 @@ int main(int argc, char** argv) { TEvhttpServer nonblockingServer(testBufferProcessor, port); nonblockingServer.serve(); } else if (transport_type == "framed") { - stdcxx::shared_ptr<transport::TNonblockingServerTransport> nbSocket; - nbSocket.reset( - ssl ? new transport::TNonblockingSSLServerSocket(port, sslSocketFactory) - : new transport::TNonblockingServerSocket(port)); + stdcxx::shared_ptr<transport::TNonblockingServerTransport> nbSocket; + nbSocket.reset( + ssl ? new transport::TNonblockingSSLServerSocket(port, sslSocketFactory) + : new transport::TNonblockingServerSocket(port)); server.reset(new TNonblockingServer(testProcessor, protocolFactory, nbSocket)); } else { - cerr << "server-type nonblocking requires transport of http or framed" << endl; - exit(1); + cerr << "server-type nonblocking requires transport of http or framed" << endl; + exit(1); } } @@ -787,18 +806,23 @@ int main(int argc, char** argv) { // if using header server->setOutputProtocolFactory(stdcxx::shared_ptr<TProtocolFactory>()); } + apache::thrift::concurrency::PlatformThreadFactory factory; factory.setDetached(false); stdcxx::shared_ptr<apache::thrift::concurrency::Runnable> serverThreadRunner(server); stdcxx::shared_ptr<apache::thrift::concurrency::Thread> thread = factory.newThread(serverThreadRunner); - thread->start(); - // THRIFT-4515: this needs to be improved - while (1) { - THRIFT_SLEEP_SEC(1); // do something other than chew up CPU like crazy - } - // NOTREACHED +#ifdef HAVE_SIGNAL_H + signal(SIGINT, signal_handler); +#endif + + thread->start(); + gMonitor.waitForever(); // wait for a shutdown signal + +#ifdef HAVE_SIGNAL_H + signal(SIGINT, SIG_DFL); +#endif server->stop(); thread->join(); @@ -808,3 +832,4 @@ int main(int argc, char** argv) { cout << "done." << endl; return 0; } + http://git-wip-us.apache.org/repos/asf/thrift/blob/9bea32f7/test/crossrunner/collect.py ---------------------------------------------------------------------- diff --git a/test/crossrunner/collect.py b/test/crossrunner/collect.py index 03b0c36..e2d8978 100644 --- a/test/crossrunner/collect.py +++ b/test/crossrunner/collect.py @@ -51,6 +51,7 @@ VALID_JSON_KEYS = [ ] DEFAULT_MAX_DELAY = 5 +DEFAULT_SIGNAL = 1 DEFAULT_TIMEOUT = 5 @@ -112,7 +113,7 @@ def _do_collect_tests(servers, clients): yield name, impl1, impl2 def maybe_max(key, o1, o2, default): - """maximum of two if present, otherwise defult value""" + """maximum of two if present, otherwise default value""" v1 = o1.get(key) v2 = o2.get(key) return max(v1, v2) if v1 and v2 else v1 or v2 or default @@ -138,6 +139,7 @@ def _do_collect_tests(servers, clients): 'server': merge_metadata(sv, **{'protocol': proto1, 'transport': trans1}), 'client': merge_metadata(cl, **{'protocol': proto2, 'transport': trans2}), 'delay': maybe_max('delay', sv, cl, DEFAULT_MAX_DELAY), + 'stop_signal': maybe_max('stop_signal', sv, cl, DEFAULT_SIGNAL), 'timeout': maybe_max('timeout', sv, cl, DEFAULT_TIMEOUT), 'protocol': proto, 'transport': trans, http://git-wip-us.apache.org/repos/asf/thrift/blob/9bea32f7/test/crossrunner/report.py ---------------------------------------------------------------------- diff --git a/test/crossrunner/report.py b/test/crossrunner/report.py index 76324ed..75f36db 100644 --- a/test/crossrunner/report.py +++ b/test/crossrunner/report.py @@ -157,8 +157,10 @@ class ExecReporter(TestReporter): ])), 'client': list(map(re.compile, [ '[Cc]onnection refused', - 'Could not connect to localhost', + 'Could not connect to', 'ECONNREFUSED', + 'econnrefused', # erl + 'CONNECTION-REFUSED-ERROR', # cl 'No such file or directory', # domain socket ])), } @@ -174,6 +176,7 @@ class ExecReporter(TestReporter): def match(line): for expr in exprs: if expr.search(line): + self._log.info("maybe false positive: %s" % line) return True with logfile_open(self.logpath, 'r') as fp: @@ -204,7 +207,7 @@ class ExecReporter(TestReporter): def _print_footer(self, returncode=None): self._print_bar() if returncode is not None: - print('Return code: %d' % returncode, file=self.out) + print('Return code: %d (negative values indicate kill by signal)' % returncode, file=self.out) else: print('Process is killed.', file=self.out) self._print_exec_time() @@ -261,7 +264,8 @@ class SummaryReporter(TestReporter): if not with_result: return '{:24s}{:18s}{:25s}'.format(name[:23], test.protocol[:17], trans[:24]) else: - return '{:24s}{:18s}{:25s}{:s}\n'.format(name[:23], test.protocol[:17], trans[:24], self._result_string(test)) + return '{:24s}{:18s}{:25s}{:s}\n'.format(name[:23], test.protocol[:17], + trans[:24], self._result_string(test)) def _print_test_header(self): self._print_bar() http://git-wip-us.apache.org/repos/asf/thrift/blob/9bea32f7/test/crossrunner/run.py ---------------------------------------------------------------------- diff --git a/test/crossrunner/run.py b/test/crossrunner/run.py index f522bb1..25c58ce 100644 --- a/test/crossrunner/run.py +++ b/test/crossrunner/run.py @@ -23,19 +23,20 @@ import multiprocessing.managers import os import platform import random -import signal import socket import subprocess import sys -import threading import time from .compat import str_join -from .test import TestEntry, domain_socket_path from .report import ExecReporter, SummaryReporter +from .test import TestEntry +from .util import domain_socket_path -RESULT_TIMEOUT = 128 RESULT_ERROR = 64 +RESULT_TIMEOUT = 128 +SIGNONE = 0 +SIGKILL = 15 # globals ports = None @@ -43,35 +44,18 @@ stop = None class ExecutionContext(object): - def __init__(self, cmd, cwd, env, report): + def __init__(self, cmd, cwd, env, stop_signal, is_server, report): self._log = multiprocessing.get_logger() - self.report = report self.cmd = cmd self.cwd = cwd self.env = env - self.timer = None + self.stop_signal = stop_signal + self.is_server = is_server + self.report = report self.expired = False self.killed = False self.proc = None - def _expire(self): - self._log.info('Timeout') - self.expired = True - self.kill() - - def kill(self): - self._log.debug('Killing process : %d' % self.proc.pid) - self.killed = True - if platform.system() != 'Windows': - try: - os.killpg(self.proc.pid, signal.SIGKILL) - except Exception: - self._log.info('Failed to kill process group', exc_info=sys.exc_info()) - try: - self.proc.kill() - except Exception: - self._log.info('Failed to kill process', exc_info=sys.exc_info()) - def _popen_args(self): args = { 'cwd': self.cwd, @@ -87,75 +71,125 @@ class ExecutionContext(object): args.update(preexec_fn=os.setsid) return args - def start(self, timeout=0): + def start(self): joined = str_join(' ', self.cmd) self._log.debug('COMMAND: %s', joined) self._log.debug('WORKDIR: %s', self.cwd) self._log.debug('LOGFILE: %s', self.report.logpath) self.report.begin() self.proc = subprocess.Popen(self.cmd, **self._popen_args()) - if timeout > 0: - self.timer = threading.Timer(timeout, self._expire) - self.timer.start() + self._log.debug(' PID: %d', self.proc.pid) + self._log.debug(' PGID: %d', os.getpgid(self.proc.pid)) return self._scoped() @contextlib.contextmanager def _scoped(self): yield self - self._log.debug('Killing scoped process') - if self.proc.poll() is None: - self.kill() - self.report.killed() + if self.is_server: + # the server is supposed to run until we stop it + if self.returncode is not None: + self.report.died() + else: + if self.stop_signal != SIGNONE: + if self.sigwait(self.stop_signal): + self.report.end(self.returncode) + else: + self.report.killed() + else: + self.sigwait(SIGKILL) else: - self._log.debug('Process died unexpectedly') - self.report.died() - - def wait(self): - self.proc.communicate() - if self.timer: - self.timer.cancel() - self.report.end(self.returncode) + # the client is supposed to exit normally + if self.returncode is not None: + self.report.end(self.returncode) + else: + self.sigwait(SIGKILL) + self.report.killed() + self._log.debug('[{0}] exited with return code {1}'.format(self.proc.pid, self.returncode)) + + # Send a signal to the process and then wait for it to end + # If the signal requested is SIGNONE, no signal is sent, and + # instead we just wait for the process to end; further if it + # does not end normally with SIGNONE, we mark it as expired. + # If the process fails to end and the signal is not SIGKILL, + # it re-runs with SIGKILL so that a real process kill occurs + # returns True if the process ended, False if it may not have + def sigwait(self, sig=SIGKILL, timeout=2): + try: + if sig != SIGNONE: + self._log.debug('[{0}] send signal {1}'.format(self.proc.pid, sig)) + if sig == SIGKILL: + self.killed = True + try: + if platform.system() != 'Windows': + os.killpg(os.getpgid(self.proc.pid), sig) + else: + self.proc.send_signal(sig) + except Exception: + self._log.info('[{0}] Failed to kill process'.format(self.proc.pid), exc_info=sys.exc_info()) + self._log.debug('[{0}] wait begin, timeout {1} sec(s)'.format(self.proc.pid, timeout)) + self.proc.communicate(timeout=timeout) + self._log.debug('[{0}] process ended with return code {1}'.format(self.proc.pid, self.returncode)) + self.report.end(self.returncode) + return True + except subprocess.TimeoutExpired: + self._log.info('[{0}] timeout waiting for process to end'.format(self.proc.pid)) + if sig == SIGNONE: + self.expired = True + return False if sig == SIGKILL else self.sigwait(SIGKILL, 1) + + # called on the client process to wait for it to end naturally + def wait(self, timeout): + self.sigwait(SIGNONE, timeout) @property def returncode(self): return self.proc.returncode if self.proc else None -def exec_context(port, logdir, test, prog): +def exec_context(port, logdir, test, prog, is_server): report = ExecReporter(logdir, test, prog) prog.build_command(port) - return ExecutionContext(prog.command, prog.workdir, prog.env, report) + return ExecutionContext(prog.command, prog.workdir, prog.env, prog.stop_signal, is_server, report) def run_test(testdir, logdir, test_dict, max_retry, async=True): logger = multiprocessing.get_logger() - def ensure_socket_open(proc, port, max_delay): - sleeped = 0.1 - time.sleep(sleeped) - sleep_step = 0.2 + def ensure_socket_open(sv, port, test): + slept = 0.1 + time.sleep(slept) + sleep_step = 0.1 while True: - # Create sockets every iteration because refused sockets cannot be - # reused on some systems. - sock4 = socket.socket() - sock6 = socket.socket(family=socket.AF_INET6) - try: - if sock4.connect_ex(('127.0.0.1', port)) == 0 \ - or sock6.connect_ex(('::1', port)) == 0: - return True - if proc.poll() is not None: - logger.warn('server process is exited') - return False - if sleeped > max_delay: - logger.warn('sleeped for %f seconds but server port is not open' % sleeped) - return False - time.sleep(sleep_step) - sleeped += sleep_step - finally: - sock4.close() - sock6.close() - logger.debug('waited %f sec for server port open' % sleeped) - return True + if slept > test.delay: + logger.warn('[{0}] slept for {1} seconds but server is not open'.format(sv.proc.pid, slept)) + return False + if test.socket == 'domain': + if not os.path.exists(domain_socket_path(port)): + logger.debug('[{0}] domain(unix) socket not available yet. slept for {1} seconds so far'.format(sv.proc.pid, slept)) + time.sleep(sleep_step) + slept += sleep_step + elif test.socket == 'abstract': + return True + else: + # Create sockets every iteration because refused sockets cannot be + # reused on some systems. + sock4 = socket.socket() + sock6 = socket.socket(family=socket.AF_INET6) + try: + if sock4.connect_ex(('127.0.0.1', port)) == 0 \ + or sock6.connect_ex(('::1', port)) == 0: + return True + if sv.proc.poll() is not None: + logger.warn('[{0}] server process is exited'.format(sv.proc.pid)) + return False + logger.debug('[{0}] socket not available yet. slept for {1} seconds so far'.format(sv.proc.pid, slept)) + time.sleep(sleep_step) + slept += sleep_step + finally: + sock4.close() + sock6.close() + logger.debug('[{0}] server ready - waited for {1} seconds'.format(sv.proc.pid, slept)) + return True try: max_bind_retry = 3 @@ -169,31 +203,27 @@ def run_test(testdir, logdir, test_dict, max_retry, async=True): logger.debug('Start') with PortAllocator.alloc_port_scoped(ports, test.socket) as port: logger.debug('Start with port %d' % port) - sv = exec_context(port, logdir, test, test.server) - cl = exec_context(port, logdir, test, test.client) + sv = exec_context(port, logdir, test, test.server, True) + cl = exec_context(port, logdir, test, test.client, False) logger.debug('Starting server') with sv.start(): - if test.socket in ('domain', 'abstract'): - time.sleep(0.1) - port_ok = True - else: - port_ok = ensure_socket_open(sv.proc, port, test.delay) + port_ok = ensure_socket_open(sv, port, test) if port_ok: connect_retry_count = 0 - max_connect_retry = 3 - connect_retry_wait = 0.5 + max_connect_retry = 12 + connect_retry_wait = 0.25 while True: if sv.proc.poll() is not None: logger.info('not starting client because server process is absent') break logger.debug('Starting client') - cl.start(test.timeout) - logger.debug('Waiting client') - cl.wait() + cl.start() + logger.debug('Waiting client (up to %d secs)' % test.timeout) + cl.wait(test.timeout) if not cl.report.maybe_false_positive() or connect_retry_count >= max_connect_retry: if connect_retry_count > 0 and connect_retry_count < max_connect_retry: - logger.warn('[%s]: Connected after %d retry (%.2f sec each)' % (test.server.name, connect_retry_count, connect_retry_wait)) + logger.info('[%s]: Connected after %d retry (%.2f sec each)' % (test.server.name, connect_retry_count, connect_retry_wait)) # Wait for 50ms to see if server does not die at the end. time.sleep(0.05) break @@ -205,12 +235,18 @@ def run_test(testdir, logdir, test_dict, max_retry, async=True): logger.warn('[%s]: Detected socket bind failure, retrying...', test.server.name) bind_retry_count += 1 else: - if cl.expired: - result = RESULT_TIMEOUT + result = RESULT_TIMEOUT if cl.expired else cl.returncode if cl.proc.poll() is not None else RESULT_ERROR + + # For servers that handle a controlled shutdown by signal + # if they are killed, or return an error code, that is a + # problem. For servers that are not signal-aware, we simply + # kill them off; if we didn't kill them off, something else + # happened (crashed?) + if test.server.stop_signal != 0: + if sv.killed or sv.returncode > 0: + result |= RESULT_ERROR else: - result = cl.proc.returncode if cl.proc else RESULT_ERROR if not sv.killed: - # Server died without being killed. result |= RESULT_ERROR if result == 0 or retry_count >= max_retry: http://git-wip-us.apache.org/repos/asf/thrift/blob/9bea32f7/test/crossrunner/test.py ---------------------------------------------------------------------- diff --git a/test/crossrunner/test.py b/test/crossrunner/test.py index 74fd916..633e926 100644 --- a/test/crossrunner/test.py +++ b/test/crossrunner/test.py @@ -22,22 +22,20 @@ import multiprocessing import os import sys from .compat import path_join -from .util import merge_dict - - -def domain_socket_path(port): - return '/tmp/ThriftTest.thrift.%d' % port +from .util import merge_dict, domain_socket_path class TestProgram(object): - def __init__(self, kind, name, protocol, transport, socket, workdir, command, env=None, + def __init__(self, kind, name, protocol, transport, socket, workdir, stop_signal, command, env=None, extra_args=[], extra_args2=[], join_args=False, **kwargs): + self.kind = kind self.name = name self.protocol = protocol self.transport = transport self.socket = socket self.workdir = workdir + self.stop_signal = stop_signal self.command = None self._base_command = self._fix_cmd_path(command) if env: http://git-wip-us.apache.org/repos/asf/thrift/blob/9bea32f7/test/crossrunner/util.py ---------------------------------------------------------------------- diff --git a/test/crossrunner/util.py b/test/crossrunner/util.py index e2d195a..c214df8 100644 --- a/test/crossrunner/util.py +++ b/test/crossrunner/util.py @@ -20,6 +20,10 @@ import copy +def domain_socket_path(port): + return '/tmp/ThriftTest.thrift.%d' % port + + def merge_dict(base, update): """Update dict concatenating list values""" res = copy.deepcopy(base) http://git-wip-us.apache.org/repos/asf/thrift/blob/9bea32f7/test/perl/TestServer.pl ---------------------------------------------------------------------- diff --git a/test/perl/TestServer.pl b/test/perl/TestServer.pl index 7d8f929..e8c1cfa 100644 --- a/test/perl/TestServer.pl +++ b/test/perl/TestServer.pl @@ -26,6 +26,8 @@ use Data::Dumper; use Getopt::Long qw(GetOptions); use Time::HiRes qw(gettimeofday); +$SIG{INT} = \&sigint_handler; + use lib '../../lib/perl/lib'; use lib 'gen-perl'; @@ -146,6 +148,12 @@ if ($opts{"domain-socket"}) { my $server = new Thrift::SimpleServer($processor, $serversocket, $transport, $protocol); print "Starting \"simple\" server ($opts{transport}/$opts{protocol}) listen on: $listening_on\n"; $server->serve(); +print "done.\n"; + +sub sigint_handler { + print "received SIGINT, stopping...\n"; + $server->stop(); +} ### ### Test server implementation http://git-wip-us.apache.org/repos/asf/thrift/blob/9bea32f7/test/test.py ---------------------------------------------------------------------- diff --git a/test/test.py b/test/test.py index 5a015ea..24e7c4e 100755 --- a/test/test.py +++ b/test/test.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file @@ -18,12 +18,13 @@ # under the License. # -# Apache Thrift - integration test suite +# +# Apache Thrift - integration (cross) test suite # # tests different server-client, protocol and transport combinations # -# This script supports python 2.7 and later. -# python 3.x is recommended for better stability. +# This script requires python 3.x due to the improvements in +# subprocess management that are needed for reliability. # from __future__ import print_function @@ -38,6 +39,12 @@ import sys import crossrunner from crossrunner.compat import path_join +# 3.3 introduced subprocess timeouts on waiting for child +req_version = (3, 3) +cur_version = sys.version_info +assert (cur_version >= req_version), "Python 3.3 or later is required for proper operation." + + ROOT_DIR = os.path.dirname(os.path.realpath(os.path.dirname(__file__))) TEST_DIR_RELATIVE = 'test' TEST_DIR = path_join(ROOT_DIR, TEST_DIR_RELATIVE) @@ -161,9 +168,11 @@ def main(argv): options.update_failures, options.print_failures) elif options.features is not None: features = options.features or ['.*'] - res = run_feature_tests(server_match, features, options.jobs, options.skip_known_failures, options.retry_count, options.regex) + res = run_feature_tests(server_match, features, options.jobs, + options.skip_known_failures, options.retry_count, options.regex) else: - res = run_cross_tests(server_match, client_match, options.jobs, options.skip_known_failures, options.retry_count, options.regex) + res = run_cross_tests(server_match, client_match, options.jobs, + options.skip_known_failures, options.retry_count, options.regex) return 0 if res else 1 http://git-wip-us.apache.org/repos/asf/thrift/blob/9bea32f7/test/tests.json ---------------------------------------------------------------------- diff --git a/test/tests.json b/test/tests.json index 671c667..9c7668d 100644 --- a/test/tests.json +++ b/test/tests.json @@ -63,7 +63,8 @@ "name": "d", "server": { "command": [ - "thrift_test_server" + "thrift_test_server", + "--trace" ] }, "client": { @@ -438,12 +439,12 @@ "compact", "json" ], - "server": { + "server": { "command": [ "dotnet", "run", - "--no-build", - "--project=Server/Server.csproj", + "--no-build", + "--project=Server/Server.csproj", "server" ] }, @@ -452,8 +453,8 @@ "command": [ "dotnet", "run", - "--no-build", - "--project=Client/Client.csproj", + "--no-build", + "--project=Client/Client.csproj", "client" ] },