Abacn commented on code in PR #38966: URL: https://github.com/apache/beam/pull/38966#discussion_r3423254078
########## sdks/python/apache_beam/io/external/xlang_mqttio_it_test.py: ########## @@ -0,0 +1,300 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Integration tests for the cross-language MQTT IO transforms +(ReadFromMqtt / WriteToMqtt), served by the messaging expansion service. + +Runs against an MQTT broker (Eclipse Mosquitto) started via testcontainers. +The DirectRunner tests use reads bounded with max_num_records; unbounded +(streaming) reads require a portable streaming runner (see the +MqttReadSchemaTransformProvider description) and are exercised by the +Prism runner test below. +""" + +import logging +import threading +import time +import unittest + +import pytest + +import apache_beam as beam +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import StandardOptions +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import BeamAssertException +from apache_beam.testing.util import assert_that +from apache_beam.typehints.row_type import RowTypeConstraint + +# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports +try: + from apache_beam.io import ReadFromMqtt + from apache_beam.io import WriteToMqtt +except ImportError: + ReadFromMqtt = None + WriteToMqtt = None + +try: + from testcontainers.core.container import DockerContainer + from testcontainers.core.waiting_utils import wait_for_logs +except ImportError: + DockerContainer = None + +NUM_RECORDS = 3 +BYTES_ROW = RowTypeConstraint.from_fields([('bytes', bytes)]) + + +def _payload_count_and_prefix_matcher(expected_count, expected_prefix): + """Matches a bounded read of a continuous publisher: exactly + expected_count payloads, each starting with expected_prefix (the absolute + sequence numbers depend on when the reader subscribed).""" + def _matcher(actual): + actual = list(actual) + if len(actual) != expected_count: + raise BeamAssertException( + 'Expected %d payloads, got %d: %s' % + (expected_count, len(actual), actual)) + for payload in actual: + if not payload.startswith(expected_prefix): + raise BeamAssertException('Unexpected payload: %s' % payload) + + return _matcher + + [email protected]_messaging_java_expansion_service [email protected]( + DockerContainer is None, 'testcontainers package is not installed') [email protected]( + ReadFromMqtt is None or WriteToMqtt is None, + 'MQTT cross-language wrappers are not generated') [email protected]( + TestPipeline().get_pipeline_options().view_as(StandardOptions).runner + is None, + 'Do not run this test on precommit suites.') [email protected]( + 'Dataflow' in ( + TestPipeline().get_pipeline_options().view_as(StandardOptions).runner or + ''), + 'The testcontainers broker is not reachable from Dataflow workers; ' + 'a Dataflow variant would need a remotely hosted MQTT broker.') +class CrossLanguageMqttIOTest(unittest.TestCase): + def setUp(self): + self.start_mqtt_container(retries=3) + host = self.broker.get_container_host_ip() + port = self.broker.get_exposed_port(1883) + self.server_uri = 'tcp://%s:%s' % (host, port) + + def tearDown(self): + # Sometimes stopping the container raises ReadTimeout. We can ignore it + # here to avoid the test failure. + try: + self.broker.stop() + except Exception: + logging.error('Could not stop the MQTT broker container.') + + # Creating a container with testcontainers sometimes raises ReadTimeout + # error, so retry a couple of times. + def start_mqtt_container(self, retries): + for i in range(retries): + try: + # /mosquitto-no-auth.conf ships with the image and enables an + # anonymous listener on port 1883. + self.broker = DockerContainer('eclipse-mosquitto:2').with_command( + 'mosquitto -c /mosquitto-no-auth.conf').with_exposed_ports(1883) + self.broker.start() + wait_for_logs(self.broker, 'mosquitto version .* running', timeout=30) + break + except Exception as e: + # If start() succeeded but a later step (e.g. wait_for_logs) failed, + # stop the partially started container so the next retry / the raised + # error does not leak a running Docker container. + try: + self.broker.stop() + except Exception: + pass + if i == retries - 1: + logging.error('Unable to initialize the MQTT broker container.') + raise e + + def _connection_configuration(self, topic, client_id): + return { + 'server_uri': self.server_uri, 'topic': topic, 'client_id': client_id + } + + def test_xlang_mqtt_read(self): + topic = 'xlang-mqtt-read-topic' + # MQTT has no message retention for regular messages, so publish + # continuously (via mosquitto_pub inside the broker container) until the + # bounded read collected what it needs. + stop_publishing = threading.Event() + + def publish_loop(): + container = self.broker.get_wrapped_container() + i = 0 + while not stop_publishing.is_set(): + container.exec_run( + ['mosquitto_pub', '-t', topic, '-m', 'msg-%d' % i, '-q', '1']) + i += 1 + time.sleep(0.5) + + publisher = threading.Thread(target=publish_loop, daemon=True) + publisher.start() + try: + with TestPipeline() as p: + p.not_use_test_runner_api = True + payloads = ( + p + | 'ReadFromMqtt' >> ReadFromMqtt( + connection_configuration=self._connection_configuration( + topic, 'xlang-mqtt-read'), + max_num_records=NUM_RECORDS, + max_read_time_seconds=120) + | 'ExtractBytes' >> beam.Map(lambda row: row.bytes)) + assert_that( + payloads, _payload_count_and_prefix_matcher(NUM_RECORDS, b'msg-')) + finally: + stop_publishing.set() + publisher.join() + + def test_xlang_mqtt_write(self): + topic = 'xlang-mqtt-write-topic' + expected_payloads = [b'msg-%d' % i for i in range(NUM_RECORDS)] + subscriber_result = {} + + def subscribe(): + # mosquitto_sub exits after receiving NUM_RECORDS messages (-C) or + # after the timeout (-W), printing one payload per line. + container = self.broker.get_wrapped_container() + exit_code, output = container.exec_run([ + 'mosquitto_sub', + '-t', + topic, + '-q', + '1', + '-C', + str(NUM_RECORDS), + '-W', + '120' + ]) + subscriber_result['exit_code'] = exit_code + subscriber_result['output'] = output + + subscriber = threading.Thread(target=subscribe, daemon=True) + subscriber.start() + # Give the subscriber time to connect before publishing. + time.sleep(5) + + with TestPipeline() as p: + p.not_use_test_runner_api = True + _ = ( + p + | 'CreatePayloads' >> beam.Create(expected_payloads) + | 'ToRow' >> beam.Map(lambda payload: beam.Row(bytes=payload)). + with_output_types(BYTES_ROW) + | 'WriteToMqtt' >> WriteToMqtt( + connection_configuration=self._connection_configuration( + topic, 'xlang-mqtt-write'))) + + subscriber.join(timeout=150) + self.assertEqual(subscriber_result.get('exit_code'), 0) + received = sorted(subscriber_result.get('output', b'').split()) + self.assertEqual(sorted(expected_payloads), received) + + def test_xlang_mqtt_read_write_streaming_on_prism(self): + """Exercises the unbounded (streaming) path on the Prism runner, which + the legacy DirectRunner cannot run: an unbounded ReadFromMqtt feeding a + WriteToMqtt on a second topic. The result is observed with a + mosquitto_sub subscriber on the output topic, after which the + (never-terminating) pipeline is cancelled.""" + source_topic = 'xlang-mqtt-streaming-source' + sink_topic = 'xlang-mqtt-streaming-sink' + stop_publishing = threading.Event() + subscriber_result = {} + + def publish_loop(): + container = self.broker.get_wrapped_container() + i = 0 + while not stop_publishing.is_set(): + container.exec_run([ + 'mosquitto_pub', '-t', source_topic, '-m', 'msg-%d' % i, '-q', '1' + ]) + i += 1 + time.sleep(0.5) + + def subscribe(): + container = self.broker.get_wrapped_container() + exit_code, output = container.exec_run([ + 'mosquitto_sub', + '-t', + sink_topic, + '-q', + '1', + '-C', + str(NUM_RECORDS), + '-W', + '180' + ]) + subscriber_result['exit_code'] = exit_code + subscriber_result['output'] = output + + publisher = threading.Thread(target=publish_loop, daemon=True) + subscriber = threading.Thread(target=subscribe, daemon=True) + publisher.start() + subscriber.start() + + options = PipelineOptions([ Review Comment: Tests should use TestPipeline. I'm not sure how the coding agent ends up with two read tests, while one test is sufficient (mqttio read is a streaming IO). If the typical test doesn't work it's some issue in that test, and should be fixed there, do not create a new one. If we follow other xlang tests, a single test run two pipelines, first write then read, and check for results. ########## sdks/python/apache_beam/io/external/xlang_mqttio_it_test.py: ########## @@ -0,0 +1,300 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Integration tests for the cross-language MQTT IO transforms +(ReadFromMqtt / WriteToMqtt), served by the messaging expansion service. + +Runs against an MQTT broker (Eclipse Mosquitto) started via testcontainers. +The DirectRunner tests use reads bounded with max_num_records; unbounded +(streaming) reads require a portable streaming runner (see the +MqttReadSchemaTransformProvider description) and are exercised by the +Prism runner test below. +""" + +import logging +import threading +import time +import unittest + +import pytest + +import apache_beam as beam +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import StandardOptions +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import BeamAssertException +from apache_beam.testing.util import assert_that +from apache_beam.typehints.row_type import RowTypeConstraint + +# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports +try: + from apache_beam.io import ReadFromMqtt + from apache_beam.io import WriteToMqtt +except ImportError: + ReadFromMqtt = None + WriteToMqtt = None + +try: + from testcontainers.core.container import DockerContainer + from testcontainers.core.waiting_utils import wait_for_logs +except ImportError: + DockerContainer = None + +NUM_RECORDS = 3 +BYTES_ROW = RowTypeConstraint.from_fields([('bytes', bytes)]) + + +def _payload_count_and_prefix_matcher(expected_count, expected_prefix): + """Matches a bounded read of a continuous publisher: exactly + expected_count payloads, each starting with expected_prefix (the absolute + sequence numbers depend on when the reader subscribed).""" + def _matcher(actual): + actual = list(actual) + if len(actual) != expected_count: + raise BeamAssertException( + 'Expected %d payloads, got %d: %s' % + (expected_count, len(actual), actual)) + for payload in actual: + if not payload.startswith(expected_prefix): + raise BeamAssertException('Unexpected payload: %s' % payload) + + return _matcher + + [email protected]_messaging_java_expansion_service [email protected]( + DockerContainer is None, 'testcontainers package is not installed') [email protected]( + ReadFromMqtt is None or WriteToMqtt is None, + 'MQTT cross-language wrappers are not generated') [email protected]( + TestPipeline().get_pipeline_options().view_as(StandardOptions).runner + is None, + 'Do not run this test on precommit suites.') [email protected]( + 'Dataflow' in ( + TestPipeline().get_pipeline_options().view_as(StandardOptions).runner or + ''), + 'The testcontainers broker is not reachable from Dataflow workers; ' + 'a Dataflow variant would need a remotely hosted MQTT broker.') +class CrossLanguageMqttIOTest(unittest.TestCase): + def setUp(self): + self.start_mqtt_container(retries=3) Review Comment: Consider moving setup container into setupClass. It's expensive to spin up and teardown container for every tests -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
