This is an automated email from the ASF dual-hosted git repository. jdanek pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/main by this push: new f909ffb DISPATCH-2103 Log actual HTTP listener port number when 0 is configured (#1425) f909ffb is described below commit f909ffb13349faff99802f0d18b7740008775292 Author: Jiri Daněk <jda...@redhat.com> AuthorDate: Sat Nov 20 21:07:30 2021 +0100 DISPATCH-2103 Log actual HTTP listener port number when 0 is configured (#1425) Unit tests and related changes for the log message amount to much of the bulk of this commit. * Old g++ (RHEL 7) does not properly implement regexes * On Ubuntu and rarely Fedora, and all other modern Linuxes, the qd_lws_listener_free is sometimes not called in my test. This is resolved with the .finalize, available only in LibWebSockets 3.1 (RHEL 7 has version 3.0.1) --- src/http-libwebsockets.c | 51 ++++-- src/server.c | 2 +- tests/c_unittests/CMakeLists.txt | 6 +- tests/c_unittests/helpers.hpp | 59 +++++++ tests/c_unittests/minimal_silent.conf | 3 + .../{minimal_silent.conf => minimal_trace.conf} | 5 +- tests/c_unittests/test_listener_startup.cpp | 174 +++++++++++++++++++++ 7 files changed, 287 insertions(+), 13 deletions(-) diff --git a/src/http-libwebsockets.c b/src/http-libwebsockets.c index 02d7994..652477f 100644 --- a/src/http-libwebsockets.c +++ b/src/http-libwebsockets.c @@ -33,6 +33,10 @@ #include <inttypes.h> #include <libwebsockets.h> +#if LWS_LIBRARY_VERSION_MAJOR > 3 || (LWS_LIBRARY_VERSION_MAJOR == 3 && LWS_LIBRARY_VERSION_MINOR >= 2) +#define QD_HAVE_MODERN_LIBWEBSOCKETS 1 +#endif + static const char *CIPHER_LIST = "ALL:aNULL:!eNULL:@STRENGTH"; /* Default */ static const char *IGNORED = "ignore-this-log-message"; @@ -116,6 +120,8 @@ static qd_log_source_t *wsi_log(struct lws *wsi); /* Declare LWS callbacks and protocol list */ +inline static void finalize_http(struct lws_vhost *vh, void *arg); + static int callback_http(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len); static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason, @@ -374,7 +380,7 @@ static void listener_start(qd_lws_listener_t *hl, qd_http_server_t *hs) { info.options |= LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT | -#if LWS_LIBRARY_VERSION_MAJOR > 3 || (LWS_LIBRARY_VERSION_MAJOR == 3 && LWS_LIBRARY_VERSION_MINOR >= 2) +#ifdef QD_HAVE_MODERN_LIBWEBSOCKETS (config->ssl_required ? 0 : LWS_SERVER_OPTION_ALLOW_NON_SSL_ON_SSL_PORT | LWS_SERVER_OPTION_ALLOW_HTTP_ON_HTTPS_LISTENER) | #else (config->ssl_required ? 0 : LWS_SERVER_OPTION_ALLOW_NON_SSL_ON_SSL_PORT) | @@ -382,17 +388,31 @@ static void listener_start(qd_lws_listener_t *hl, qd_http_server_t *hs) { ((config->requireAuthentication && info.ssl_ca_filepath) ? LWS_SERVER_OPTION_REQUIRE_VALID_OPENSSL_CLIENT_CERT : 0); } info.vhost_name = hl->listener->config.host_port; +#ifdef QD_HAVE_MODERN_LIBWEBSOCKETS + info.finalize = finalize_http; + info.finalize_arg = hl; +#endif hl->vhost = lws_create_vhost(hs->context, &info); - if (hl->vhost) { - /* Store hl pointer in vhost */ - void *vp = lws_protocol_vh_priv_zalloc(hl->vhost, &protocols[0], sizeof(hl)); - memcpy(vp, &hl, sizeof(hl)); - qd_log(hs->log, QD_LOG_NOTICE, "Listening for HTTP on %s", config->host_port); - return; - } else { + if (!hl->vhost) { qd_log(hs->log, QD_LOG_NOTICE, "Error listening for HTTP on %s", config->host_port); goto error; } + + /* Store hl pointer in vhost */ + void *vp = lws_protocol_vh_priv_zalloc(hl->vhost, &protocols[0], sizeof(hl)); + memcpy(vp, &hl, sizeof(hl)); + + if (port == 0) { + // If a 0 (zero) is specified for a port, get the actual listening port from the listener. + const int resolved_port = lws_get_vhost_port(hl->vhost); + assert(resolved_port != -1); // already checked the vhost is successfully started + if (config->name) + qd_log(hs->log, QD_LOG_NOTICE, "Listening for HTTP on %s:%d (%s)", config->host, resolved_port, config->name); + else + qd_log(hs->log, QD_LOG_NOTICE, "Listening for HTTP on %s:%d", config->host, resolved_port); + } else { + qd_log(hs->log, QD_LOG_NOTICE, "Listening for HTTP on %s", config->host_port); + } return; error: @@ -417,17 +437,28 @@ static void listener_close(qd_lws_listener_t *hl, qd_http_server_t *hs) { static int callback_http(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { + /* + * Modern LibWebSockets uses the .finalize callback on lws_context for cleanup + */ +#ifndef QD_HAVE_MODERN_LIBWEBSOCKETS switch (reason) { case LWS_CALLBACK_PROTOCOL_DESTROY: - qd_lws_listener_free(wsi_listener(wsi)); + finalize_http(NULL, wsi_listener(wsi)); break; - default: + default: break; } +#endif + /* Do default HTTP handling for all the cases we don't care about. */ return lws_callback_http_dummy(wsi, reason, user, in, len); } +inline static void finalize_http(struct lws_vhost *vh /*unused*/, void *arg) { + qd_lws_listener_t *listener = (qd_lws_listener_t*) arg; + qd_lws_listener_free(listener); +} + /* Wake up a connection managed by the http server thread */ static void connection_wake(qd_connection_t *qd_conn) { diff --git a/src/server.c b/src/server.c index e936593..26e1342 100644 --- a/src/server.c +++ b/src/server.c @@ -1660,7 +1660,7 @@ qd_listener_t *qd_server_listener(qd_server_t *server) } static bool qd_listener_listen_pn(qd_listener_t *li) { - li->pn_listener = pn_listener(); + li->pn_listener = pn_listener(); if (li->pn_listener) { pn_listener_set_context(li->pn_listener, &li->type); pn_proactor_listen(li->server->proactor, li->pn_listener, li->config.host_port, diff --git a/tests/c_unittests/CMakeLists.txt b/tests/c_unittests/CMakeLists.txt index 6c19e17..be1364f 100644 --- a/tests/c_unittests/CMakeLists.txt +++ b/tests/c_unittests/CMakeLists.txt @@ -33,15 +33,19 @@ add_executable(c_unittests helpers.cpp helpers.hpp test_amqp.cpp + test_listener_startup.cpp test_router_startup.cpp test_terminus.cpp $<TARGET_OBJECTS:qpid-dispatch>) target_link_libraries(c_unittests pthread qpid-dispatch-libraries) -file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/minimal_silent.conf +file(COPY + ${CMAKE_CURRENT_SOURCE_DIR}/minimal_silent.conf + ${CMAKE_CURRENT_SOURCE_DIR}/minimal_trace.conf DESTINATION ${CMAKE_CURRENT_BINARY_DIR}) add_test( NAME c_unittests COMMAND ${TEST_WRAP} $<TARGET_FILE:c_unittests> + WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR} ) diff --git a/tests/c_unittests/helpers.hpp b/tests/c_unittests/helpers.hpp index 899a00c..77aadb3 100644 --- a/tests/c_unittests/helpers.hpp +++ b/tests/c_unittests/helpers.hpp @@ -20,6 +20,8 @@ #ifndef QPID_DISPATCH_HELPERS_HPP #define QPID_DISPATCH_HELPERS_HPP +#include <unistd.h> + #include <cassert> #include <condition_variable> #include <fstream> @@ -144,6 +146,9 @@ class WithNoMemoryLeaks /// Submits an action to the router's action list. When action runs, we know router finished all previous actions. /// /// This can be used to detect the router finished starting (i.e., performed all previously scheduled actions). +/// +/// Enqueued actions get processed on the router core thread, one by one. These qdr_actions are different from Proton +/// proactor events that get processed in router's worker threads. Use qd timeouts to schedule on worker threads. class RouterStartupLatch { public: @@ -211,6 +216,7 @@ class QDR } else { // this is the abbreviated setup load_config() calls from Python, this way we can sometimes skip loading a // config file + qd->thread_count = 1; REQUIRE(qd_dispatch_prepare(qd) == QD_ERROR_NONE); qd_router_setup_late(qd); // sets up e.g. qd->router->router_core } @@ -237,6 +243,19 @@ class QDR qd_server_stop(qd); } + /// Schedules QDR.stop using qd_timer + /// + /// The returned value must outlive the end of timer activation! + std::unique_ptr<qd_timer_t, void (*)(qd_timer_t *)> schedule_stop(int timeout = 0) const + { + qd_timer_t *timer = qd_timer(qd, [](void* context) { + QDR *that = static_cast<QDR*>(context); + that->stop(); + }, (void*)this); + qd_timer_schedule(timer, timeout); + return qd_make_unique(timer, qd_timer_free); + } + /// Frees the router and optionally checks for leaks. void deinitialize(bool check_leaks = true) const { @@ -289,4 +308,44 @@ class QDRMinimalEnv } }; +class CaptureCStream +{ + FILE **mStream; + FILE *mMemstream; + FILE *mOriginal; + + char *buf; + size_t size; + public: + CaptureCStream(FILE **stream) : mStream(stream), mOriginal(*stream) { + mMemstream = open_memstream(&buf, &size); + *mStream = mMemstream; + } + + void reset() { + *mStream = mOriginal; + } + + size_t checkpoint() { + fflush(mMemstream); + return size; + } + + std::string str() { + fflush(mMemstream); + return std::string(buf, size); + } + + std::string str(size_t begin) { + fflush(mMemstream); + return std::string(buf + begin, size - begin); + } + + ~CaptureCStream() { + reset(); + fclose(mMemstream); + free(buf); + } +}; + #endif // QPID_DISPATCH_HELPERS_HPP diff --git a/tests/c_unittests/minimal_silent.conf b/tests/c_unittests/minimal_silent.conf index 6efa5d4..28d0baf 100644 --- a/tests/c_unittests/minimal_silent.conf +++ b/tests/c_unittests/minimal_silent.conf @@ -17,6 +17,9 @@ ## under the License ## +router { + workerThreads: 1 +} log { module: DEFAULT diff --git a/tests/c_unittests/minimal_silent.conf b/tests/c_unittests/minimal_trace.conf similarity index 94% copy from tests/c_unittests/minimal_silent.conf copy to tests/c_unittests/minimal_trace.conf index 6efa5d4..6d30ab7 100644 --- a/tests/c_unittests/minimal_silent.conf +++ b/tests/c_unittests/minimal_trace.conf @@ -17,8 +17,11 @@ ## under the License ## +router { + workerThreads: 1 +} log { module: DEFAULT - enable: warn+ + enable: trace+ } diff --git a/tests/c_unittests/test_listener_startup.cpp b/tests/c_unittests/test_listener_startup.cpp new file mode 100644 index 0000000..03c5fed --- /dev/null +++ b/tests/c_unittests/test_listener_startup.cpp @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "./qdr_doctest.hpp" +#include "./helpers.hpp" // must come after ./qdr_doctest.hpp + +#include <proton/listener.h> + +#include <regex> +#include <thread> + +extern "C" { +qd_listener_t *qd_dispatch_configure_listener(qd_dispatch_t *qd, qd_entity_t *entity); +void qd_connection_manager_delete_listener(qd_dispatch_t *qd, void *impl); +} + + +/// GCC 4.8 made a questionable choice to implement std::regex_search to always +/// return false. Meaning that tests cannot use regex on RHEL 7 +static bool regex_is_broken() { + return !std::regex_search("", std::regex("")); +} + +void check_amqp_listener_startup_log_message(qd_server_config_t config, std::string listen, std::string stop) +{ + QDR qdr{}; + CaptureCStream css(&stderr); + qdr.initialize("./minimal_trace.conf"); + + qd_listener_t *li = qd_server_listener(qdr.qd->server); + li->config = config; + + CHECK(qd_listener_listen(li)); + pn_listener_close(li->pn_listener); + { + /* AMQP socket is opened (and closed) only when proactor loop runs; meaning router has to be started */ + auto timer = qdr.schedule_stop(0); + qdr.run(); + } + + qd_listener_decref(li); + qdr.deinitialize(); + + std::string logging = css.str(); + CHECK_MESSAGE(std::regex_search(logging, std::regex(listen)), + listen, " not found in ", logging); + CHECK_MESSAGE(std::regex_search(logging, std::regex(stop)), + stop, " not found in ", logging); +} + +void check_http_listener_startup_log_message(qd_server_config_t config, std::string listen, std::string stop, std::string failed) +{ + QDR qdr{}; + CaptureCStream css(&stderr); + qdr.initialize("./minimal_trace.conf"); + + qd_listener_t *li = qd_server_listener(qdr.qd->server); + li->config = config; + + const bool http_supported = qd_server_http(qdr.qd->server) != nullptr; + + CHECK(qd_listener_listen(li) == http_supported); + qdr.wait(); + qd_lws_listener_close(li->http); + qd_listener_decref(li); + { + auto timer = qdr.schedule_stop(0); + qdr.run(); + } + + qdr.deinitialize(); + + std::string logging = css.str(); + const std::string unavailable = "SERVER (warning) HTTP support is not available"; + CHECK_MESSAGE((logging.find(unavailable) == std::string::npos) == http_supported, + unavailable, " (not) found in ", logging); + + CHECK_MESSAGE(std::regex_search(logging, std::regex(listen)) == http_supported, + listen, " (not) found in ", logging); + CHECK_MESSAGE(std::regex_search(logging, std::regex(stop)) == http_supported, + stop, " (not) found in ", logging); + + CHECK_MESSAGE(std::regex_search(logging, std::regex(failed)) != http_supported, + failed, " (not) found in ", logging); + +} + +TEST_CASE("Start AMQP listener with zero port" * doctest::skip(regex_is_broken())) +{ + std::thread([] { + qd_server_config_t config{}; + config.port = strdup("0"); + config.host = strdup("localhost"); + config.host_port = strdup("localhost:0"); + + check_amqp_listener_startup_log_message( + config, + R"EOS(SERVER \(notice\) Listening on (127.0.0.1)|(::1):(\d\d+))EOS", + R"EOS(SERVER \(trace\) Listener closed on localhost:0)EOS" + ); + }).join(); +} + +TEST_CASE("Start AMQP listener with zero port and a name" * doctest::skip(regex_is_broken())) +{ + std::thread([] { + qd_server_config_t config{}; + config.name = strdup("pepa"); + config.port = strdup("0"); + config.host = strdup("localhost"); + config.host_port = strdup("localhost:0"); + + check_amqp_listener_startup_log_message( + config, + R"EOS(SERVER \(notice\) Listening on (127.0.0.1)|(::1):(\d\d+) \(pepa\))EOS", + R"EOS(SERVER \(trace\) Listener closed on localhost:0)EOS" + ); + }).join(); +} + +TEST_CASE("Start HTTP listener with zero port" * doctest::skip(regex_is_broken())) +{ + std::thread([] { + qd_server_config_t config{}; + config.port = strdup("0"); + config.host = strdup("localhost"); + config.host_port = strdup("localhost:0"); + config.http = true; + + check_http_listener_startup_log_message( + config, + R"EOS(SERVER \(notice\) Listening for HTTP on localhost:(\d\d+))EOS", + R"EOS(SERVER \(notice\) Stopped listening for HTTP on localhost:0)EOS", + + R"EOS(SERVER \(error\) No HTTP support to listen on localhost:0)EOS" + ); + }).join(); +} + +TEST_CASE("Start HTTP listener with zero port and a name" * doctest::skip(regex_is_broken())) +{ + std::thread([] { + qd_server_config_t config{}; + config.name = strdup("pepa"); + config.port = strdup("0"); + config.host = strdup("localhost"); + config.host_port = strdup("localhost:0"); + config.http = true; + + check_http_listener_startup_log_message( + config, + R"EOS(SERVER \(notice\) Listening for HTTP on localhost:(\d\d+))EOS", + R"EOS(SERVER \(notice\) Stopped listening for HTTP on localhost:0)EOS", + + R"EOS(SERVER \(error\) No HTTP support to listen on localhost:0)EOS" + ); + }).join(); +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org