This is an automated email from the ASF dual-hosted git repository. gmurthy pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/main by this push: new 4d2889e DISPATCH-1896 - Propagate the GOAWAY frame received from the http2 server all the way back to the client. This closes #1242. 4d2889e is described below commit 4d2889e8dbf2dd96080f1c3429db55d3d36c7faf Author: Ganesh Murthy <gmur...@apache.org> AuthorDate: Thu Jan 7 22:03:44 2021 -0500 DISPATCH-1896 - Propagate the GOAWAY frame received from the http2 server all the way back to the client. This closes #1242. --- .github/workflows/build.yaml | 4 +- .travis.yml | 4 +- README | 10 ++-- dockerfiles/Dockerfile-fedora | 6 +-- dockerfiles/Dockerfile-ubuntu | 7 +-- src/adaptors/http2/http2_adaptor.c | 103 +++++++++++++++++++++++++---------- src/adaptors/http2/http2_adaptor.h | 1 + tests/hyperh2_server.py | 107 +++++++++++++++++++++++++++++++++++++ tests/system_tests_http2.py | 59 +++++++++++++++++++- 9 files changed, 252 insertions(+), 49 deletions(-) diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 43690b1..8120d2e 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -202,7 +202,7 @@ jobs: architecture: x64 - name: Install Python runtime/test dependencies - run: python -m pip install tox quart selectors grpcio protobuf pytest + run: python -m pip install tox quart selectors h2 grpcio protobuf pytest - name: Install Linux runtime/test dependencies if: ${{ runner.os == 'Linux' }} @@ -395,7 +395,7 @@ jobs: run: env -0 | sort -z | tr '\0' '\n' - name: Install Python runtime/test dependencies - run: python3 -m pip install tox quart selectors protobuf pytest + run: python3 -m pip install tox quart selectors h2 protobuf pytest - name: Install Python runtime/test dependencies (Fedora) if: ${{ matrix.container == 'fedora' }} diff --git a/.travis.yml b/.travis.yml index c2c004f..02dc77b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -78,8 +78,8 @@ jobs: # https://github.com/pypa/virtualenv/issues/1873 - python -m pip install --user --upgrade pip - python -m pip install --user --upgrade tox virtualenv==20.0.23 - # Install quart to run the http2 tests. - - python -m pip install --user quart + # Install quart, h2 to run the http2 tests. + - python -m pip install --user quart h2 # DISPATCH-1883: Install selectors to run tcp echo server/client tools - python -m pip install --user selectors # Install grpcio and protobuf to run the grpc tests. diff --git a/README b/README index b0c3f9e..813159b 100644 --- a/README +++ b/README @@ -54,21 +54,19 @@ $ ./run.py -m unittest system_tests_qdstat Run it without arguments to get a summary of how it can be used: $ ./run.py -The HTTP2 system tests (tests/system_tests_http2.py) use the Python Quart framework to start a HTTP2 server. +The HTTP2 system tests (tests/system_tests_http2.py) use the Python Quart and hyper-h2 frameworks to start a HTTP2 server. The HTTP2 system tests will run only if 1. Python version >= 3.7 2. Python Web Microframework Quart version >= 0.13 3. curl is available + 4. hyper-h2 is available (pure-Python implementation of a HTTP/2 protocol stack) The TCP system tests (tests/system_tests_tcp_adaptor.py) use the Python selectors module when running echo clients and servers. The TCP system tests run only if Python selectors is available. -To install pip, Quart, and selectors - - curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py - - python3 get-pip.py - - pip3 install --user quart - - pip3 install --user selectors +To install quart, h2 and selectors + - pip3 install --user quart h2 selectors The gRPC system tests (tests/system_tests_grpc.py) use grpcio and protobuf modules. To install them use: diff --git a/dockerfiles/Dockerfile-fedora b/dockerfiles/Dockerfile-fedora index 82cd362..6af7335 100644 --- a/dockerfiles/Dockerfile-fedora +++ b/dockerfiles/Dockerfile-fedora @@ -32,11 +32,7 @@ MAINTAINER "d...@qpid.apache.org" # Install required packages. Some in this list are from proton's INSTALL.md (https://github.com/apache/qpid-proton/blob/main/INSTALL.md) and the rest are from dispatch (https://github.com/apache/qpid-dispatch/blob/main/README) RUN dnf -y install gcc gcc-c++ cmake openssl-devel cyrus-sasl-devel cyrus-sasl-plain cyrus-sasl-gssapi cyrus-sasl-md5 swig java-1.8.0-openjdk-devel git make valgrind emacs libwebsockets-devel python-devel libnghttp2-devel curl -RUN curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py -RUN python3 get-pip.py -RUN pip3 install quart -RUN pip3 install selectors -RUN pip3 install grpcio protobuf +RUN pip3 install quart grpcio protobuf h2 selectors # Create a main directory and clone the qpid-proton repo from github RUN mkdir /main && cd /main && git clone https://gitbox.apache.org/repos/asf/qpid-proton.git && cd /main/qpid-proton && mkdir /main/qpid-proton/build diff --git a/dockerfiles/Dockerfile-ubuntu b/dockerfiles/Dockerfile-ubuntu index 7e11984..26eef78 100644 --- a/dockerfiles/Dockerfile-ubuntu +++ b/dockerfiles/Dockerfile-ubuntu @@ -29,12 +29,7 @@ RUN apt-get update && \ apt-get install -y curl gcc g++ automake libwebsockets-dev libtool zlib1g-dev cmake libsasl2-dev libssl-dev libnghttp2-dev python3-dev libuv1-dev sasl2-bin swig maven git && \ apt-get -y clean -RUN curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py -RUN python3 get-pip.py -RUN pip3 install quart -RUN pip3 install selectors -RUN pip3 install grpcio protobuf - +RUN pip3 install quart selectors grpcio protobuf h2 RUN git clone https://gitbox.apache.org/repos/asf/qpid-dispatch.git && cd /qpid-dispatch && git submodule add https://gitbox.apache.org/repos/asf/qpid-proton.git && git submodule update --init diff --git a/src/adaptors/http2/http2_adaptor.c b/src/adaptors/http2/http2_adaptor.c index 92de651..cfbf560 100644 --- a/src/adaptors/http2/http2_adaptor.c +++ b/src/adaptors/http2/http2_adaptor.c @@ -82,6 +82,31 @@ static void free_all_connection_streams(qdr_http2_connection_t *http_conn, bool } } +/** + * All streams with id greater than the last_stream_id will be freed. + */ +static void free_unprocessed_streams(qdr_http2_connection_t *http_conn, int32_t last_stream_id) +{ + qdr_http2_stream_data_t *stream_data = DEQ_HEAD(http_conn->session_data->streams); + while (stream_data) { + int32_t stream_id = stream_data->stream_id; + + // + // This stream_id is greater that the last_stream_id, this stream will not be processed by the http server + // and hence needs to be freed. + // + if (stream_id > last_stream_id) { + qdr_http2_stream_data_t *next_stream_data = DEQ_NEXT(stream_data); + qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] Freeing stream in free_last_id_streams", stream_data->session_data->conn->conn_id, stream_data->stream_id); + free_http2_stream_data(stream_data, false); + stream_data = next_stream_data; + } + else { + stream_data = DEQ_NEXT(stream_data); + } + } +} + static void set_stream_data_delivery_flags(qdr_http2_stream_data_t * stream_data, qdr_delivery_t *dlv) { if (dlv == stream_data->in_dlv) { stream_data->in_dlv_decrefed = true; @@ -244,6 +269,10 @@ qd_composed_field_t *qd_message_compose_amqp(qd_message_t *msg, static size_t write_buffers(qdr_http2_connection_t *conn) { qdr_http2_session_data_t *session_data = conn->session_data; + + if (!conn->pn_raw_conn) + return 0; + size_t pn_buffs_to_write = pn_raw_connection_write_buffers_capacity(conn->pn_raw_conn); qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"] write_buffers pn_raw_connection_write_buffers_capacity=%zu", conn->conn_id, pn_buffs_to_write); @@ -918,6 +947,31 @@ static int on_frame_recv_callback(nghttp2_session *session, qdr_http2_stream_data_t *stream_data = nghttp2_session_get_stream_user_data(session_data->session, stream_id); switch (frame->hd.type) { + case NGHTTP2_GOAWAY: { + // + // A GOAWAY frame has been received from the HTTP2 server. Usually a server sends a GOAWAY but nothing prevents the client from sending one. + // + // "The GOAWAY frame is used to initiate shutdown of a connection or to signal serious error conditions. GOAWAY allows an + // endpoint to gracefully stop accepting new streams while still + // finishing processing of previously established streams. This enables administrative actions, like server maintenance. + // Receivers of a GOAWAY frame MUST NOT open additional streams on the connection, although a new connection can be established for new streams." + // + // We will close any unprocessed streams on the connection. In doing so, all the outstanding deliveries on that connection will be PN_RELEASED which will in turn release all the peer + // deliveries on the client side which will enable us to send a GOAWAY frame to the client. This is how we propagate a GOAWAY received from the server side to the client side. + // + // We will also close the pn_raw_connection (we will not close the qdr_connection_t and the qdr_http2_connection_t, those will still remain). This will close the TCP connection to the server + // and will enable creation of a new connection to the server since we are not allowed to create any more streams on the connection that received the GOAWAY frame. + // + qd_log(http2_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%"PRIu64"][S%"PRId32"] GOAWAY frame received", conn->conn_id, stream_id); + int32_t last_stream_id = frame->goaway.last_stream_id; + // Free all streams that are greater that the last_stream_id because the server is not going to process those streams. + free_unprocessed_streams(conn, last_stream_id); + conn->goaway_received = true; + pn_raw_connection_close(conn->pn_raw_conn); + qd_log(http2_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%"PRIu64"][S%"PRId32"] pn_raw_connection closed after GOAWAY frame received", conn->conn_id, stream_id); + return 0; + } + break; case NGHTTP2_PING: { qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] HTTP2 PING frame received", conn->conn_id, stream_id); } @@ -1432,49 +1486,36 @@ static void qdr_http_delivery_update(void *context, qdr_delivery_t *dlv, uint64_ } if (settled) { - nghttp2_nv hdrs[2]; + nghttp2_nv hdrs[3]; if (conn->ingress && (disp == PN_RELEASED || disp == PN_MODIFIED || disp == PN_REJECTED)) { if (disp == PN_RELEASED || disp == PN_MODIFIED) { - uint8_t * error_msg = (uint8_t *)"Service Unavailable"; hdrs[0].name = (uint8_t *)":status"; hdrs[0].value = (uint8_t *)"503"; hdrs[0].namelen = 7; hdrs[0].valuelen = 3; hdrs[0].flags = NGHTTP2_NV_FLAG_NONE; - - hdrs[1].name = (uint8_t *)":content-type"; - hdrs[1].value = (uint8_t *)"text/plain"; - hdrs[1].namelen = 13; - hdrs[1].valuelen = 10; - hdrs[1].flags = NGHTTP2_NV_FLAG_NONE; - - nghttp2_data_provider data_prd; - data_prd.read_callback = error_read_callback; - data_prd.source.ptr = error_msg; - - nghttp2_submit_response(stream_data->session_data->session, stream_data->stream_id, hdrs, 2, &data_prd); - nghttp2_submit_goaway(stream_data->session_data->session, 0, stream_data->stream_id, NGHTTP2_CONNECT_ERROR, error_msg, 19); } else if (disp == PN_REJECTED) { - uint8_t * error_msg = (uint8_t *)"Resource Unavailable"; hdrs[0].name = (uint8_t *)":status"; hdrs[0].value = (uint8_t *)"400"; hdrs[0].namelen = 7; hdrs[0].valuelen = 3; hdrs[0].flags = NGHTTP2_NV_FLAG_NONE; + } - hdrs[1].name = (uint8_t *)":content-type"; - hdrs[1].value = (uint8_t *)"text/plain"; - hdrs[1].namelen = 13; - hdrs[1].valuelen = 10; - hdrs[1].flags = NGHTTP2_NV_FLAG_NONE; + hdrs[1].name = (uint8_t *)"content-type"; + hdrs[1].value = (uint8_t *)"text/html; charset=utf-8"; + hdrs[1].namelen = 12; + hdrs[1].valuelen = 24; + hdrs[1].flags = NGHTTP2_NV_FLAG_NONE; - nghttp2_data_provider data_prd; - data_prd.read_callback = error_read_callback; - data_prd.source.ptr = error_msg; + hdrs[2].name = (uint8_t *)"content-length"; + hdrs[2].value = (uint8_t *)"0"; + hdrs[2].namelen = 14; + hdrs[2].valuelen = 1; + hdrs[2].flags = NGHTTP2_NV_FLAG_NONE; - nghttp2_submit_response(stream_data->session_data->session, stream_data->stream_id, hdrs, 2, &data_prd); - } + nghttp2_submit_headers(stream_data->session_data->session, NGHTTP2_FLAG_END_HEADERS | NGHTTP2_FLAG_END_STREAM, stream_data->stream_id, NULL, hdrs, 3, 0); } if (!conn->ingress && (disp == PN_RELEASED || disp == PN_MODIFIED || disp == PN_REJECTED)) { @@ -2026,6 +2067,7 @@ static int handle_incoming_http(qdr_http2_connection_t *conn) return count; } + qdr_http2_connection_t *qdr_http_connection_ingress_accept(qdr_http2_connection_t* ingress_http_conn) { ingress_http_conn->remote_address = get_address_string(ingress_http_conn->pn_raw_conn); @@ -2378,6 +2420,7 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void qd_log_source_t *log = http2_adaptor->log_source; switch (pn_event_type(e)) { case PN_RAW_CONNECTION_CONNECTED: { + conn->goaway_received = false; if (conn->ingress) { qdr_http_connection_ingress_accept(conn); send_settings_frame(conn); @@ -2397,7 +2440,8 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void break; } case PN_RAW_CONNECTION_CLOSED_READ: { - pn_raw_connection_close(conn->pn_raw_conn); + if (conn->pn_raw_conn) + pn_raw_connection_close(conn->pn_raw_conn); qd_log(log, QD_LOG_TRACE, "[C%"PRIu64"] PN_RAW_CONNECTION_CLOSED_READ", conn->conn_id); break; } @@ -2418,6 +2462,10 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void } } conn->connection_established = false; + if (conn->goaway_received) { + nghttp2_session_del(conn->session_data->session); + conn->session_data->session = 0; + } handle_disconnected(conn); break; } @@ -2453,6 +2501,7 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void pn_raw_buffer_t buffs[WRITE_BUFFERS]; size_t n; size_t written = 0; + if (conn->pn_raw_conn == 0) { qd_log(log, QD_LOG_TRACE, "[C%"PRIu64"] PN_RAW_CONNECTION_WRITTEN, No pn_raw_conn", conn->conn_id); break; diff --git a/src/adaptors/http2/http2_adaptor.h b/src/adaptors/http2/http2_adaptor.h index 37e3e52..26fb161 100644 --- a/src/adaptors/http2/http2_adaptor.h +++ b/src/adaptors/http2/http2_adaptor.h @@ -149,6 +149,7 @@ struct qdr_http2_connection_t { bool woken_by_ping; bool first_pinged; bool delete_egress_connections; // If set to true, the egress qdr_connection_t and qdr_http2_connection_t objects will be deleted + bool goaway_received; DEQ_LINKS(qdr_http2_connection_t); }; diff --git a/tests/hyperh2_server.py b/tests/hyperh2_server.py new file mode 100644 index 0000000..13e5798 --- /dev/null +++ b/tests/hyperh2_server.py @@ -0,0 +1,107 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import socket + +import signal +import sys +import os +import h2.connection +import h2.events +import h2.config +import h2.errors + +BYTES = 65535 + + +def receive_signal(signalNumber, frame): + print('Received:', signalNumber) + sys.exit(0) + + +def handle_goaway_test_1(event, conn): + """ + conn.close_connection() sends a goaway frame to the client + and closes the connection. + """ + # When a request is made on the URL "/goaway_test_1", we immediately close + # the connection which sends a GOAWAY frame. + conn.close_connection(error_code=h2.errors.ErrorCodes.NO_ERROR, + additional_data=None, + last_stream_id=0) + + +def handle_request(event, conn): + request_headers = event.headers + for request_header in request_headers: + str_request_header = str(request_header[0], "utf-8") + if str_request_header == ":path": + request_path = str(request_header[1], "utf-8") + if "goaway_test_1" in request_path: + handle_goaway_test_1(event, conn) + + +def handle_events(conn, events): + for event in events: + if isinstance(event, h2.events.RequestReceived): + handle_request(event, conn) + + +def handle(sock): + config = h2.config.H2Configuration(client_side=False) + conn = h2.connection.H2Connection(config=config) + conn.initiate_connection() + sock.sendall(conn.data_to_send()) + + while True: + data = None + try: + data = sock.recv(BYTES) + except: + pass + if not data: + break + try: + events = conn.receive_data(data) + except Exception as e: + print(e) + break + handle_events(conn, events) + data_to_send = conn.data_to_send() + if data_to_send: + sock.sendall(data_to_send) + + +signal.signal(signal.SIGHUP, receive_signal) +signal.signal(signal.SIGINT, receive_signal) +signal.signal(signal.SIGQUIT, receive_signal) +signal.signal(signal.SIGILL, receive_signal) +signal.signal(signal.SIGTERM, receive_signal) + +sock = socket.socket() +sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) +sock.bind(('0.0.0.0', int(os.getenv('SERVER_LISTEN_PORT')))) +sock.listen(5) + +while True: + # The accept method blocks until someone attempts to connect to our TCP + # port: when they do, it returns a tuple: the first element is a new + # socket object, the second element is a tuple of the address the new + # connection is from + handle(sock.accept()[0]) diff --git a/tests/system_tests_http2.py b/tests/system_tests_http2.py index 66894f0..30524a5 100644 --- a/tests/system_tests_http2.py +++ b/tests/system_tests_http2.py @@ -24,6 +24,12 @@ import system_test from system_test import TestCase, Qdrouterd, QdManager, Process, SkipIfNeeded from subprocess import PIPE +h2hyper_installed = True +try: + import h2.connection # noqa F401: imported but unused +except ImportError: + h2hyper_installed = False + def python_37_available(): if sys.version_info >= (3, 7): @@ -73,8 +79,16 @@ def skip_test(): return True +def skip_h2_test(): + if python_37_available() and h2hyper_installed and curl_available(): + return False + return True + + class Http2TestBase(TestCase): - def run_curl(self, args=None, regexp=None, timeout=system_test.TIMEOUT, address=None): + def run_curl(self, args=None, regexp=None, + timeout=system_test.TIMEOUT, + address=None): # Tell with -m / --max-time the maximum time, in seconds, that you # allow the command line to spend before curl exits with a # timeout error code (28). @@ -692,3 +706,46 @@ class Http2TestEdgeToEdgeViaInteriorRouter(Http2TestBase, CommonHttp2Tests): def test_zzz_http_connector_delete(self): self.check_connector_delete(client_addr=self.router_qdra.http_addresses[0], server_addr=self.router_qdrb.addresses[0]) + + +class Http2TestGoAway(Http2TestBase): + @classmethod + def setUpClass(cls): + super(Http2TestGoAway, cls).setUpClass() + if skip_h2_test(): + return + cls.http2_server_name = "hyperh2_server" + os.environ['SERVER_LISTEN_PORT'] = str(cls.tester.get_port()) + cls.http2_server = cls.tester.http2server(name=cls.http2_server_name, + listen_port=int(os.getenv('SERVER_LISTEN_PORT')), + py_string='python3', + server_file="hyperh2_server.py") + name = "http2-test-router" + cls.connector_name = 'connectorToBeDeleted' + cls.connector_props = { + 'port': os.getenv('SERVER_LISTEN_PORT'), + 'address': 'examples', + 'host': '127.0.0.1', + 'protocolVersion': 'HTTP2', + 'name': cls.connector_name + } + config = Qdrouterd.Config([ + ('router', {'mode': 'standalone', 'id': 'QDR'}), + ('listener', {'port': cls.tester.get_port(), 'role': 'normal', 'host': '0.0.0.0'}), + + ('httpListener', {'port': cls.tester.get_port(), 'address': 'examples', + 'host': '127.0.0.1', 'protocolVersion': 'HTTP2'}), + ('httpConnector', cls.connector_props) + ]) + cls.router_qdra = cls.tester.qdrouterd(name, config, wait=True) + + @SkipIfNeeded(skip_h2_test(), + "Python 3.7 or greater, hyper-h2 and curl needed to run hyperhttp2 tests") + def test_goaway(self): + # Executes a request against the router at the /goaway_test_1 URL + # The router in turn forwards the request to the http2 server which + # responds with a GOAWAY frame. The router propagates this + # GOAWAY frame to the client and issues a HTTP 503 to the client + address = self.router_qdra.http_addresses[0] + "/goaway_test_1" + out = self.run_curl(address=address, args=["-i"]) + self.assertIn("HTTP/2 503", out) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org