This is an automated email from the ASF dual-hosted git repository. lostluck pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new f1f5f775252 [prism] Skip python tests that require an expansion service at this time. (#32182) f1f5f775252 is described below commit f1f5f775252a50929a96388b91e4497051aaec91 Author: Robert Burke <lostl...@users.noreply.github.com> AuthorDate: Wed Aug 14 15:39:29 2024 -0700 [prism] Skip python tests that require an expansion service at this time. (#32182) * [prism] Skip tests that require an expansion service at this time. * Remove extra python imports. --------- Co-authored-by: lostluck <13907733+lostl...@users.noreply.github.com> --- .../runners/portability/prism_runner_test.py | 77 ++-------------------- 1 file changed, 4 insertions(+), 73 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/prism_runner_test.py b/sdks/python/apache_beam/runners/portability/prism_runner_test.py index b179156877e..387b7ba2bec 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner_test.py @@ -30,11 +30,6 @@ from tempfile import mkdtemp import pytest import apache_beam as beam -from apache_beam import Impulse -from apache_beam import Map -from apache_beam.io.external.generate_sequence import GenerateSequence -from apache_beam.io.kafka import ReadFromKafka -from apache_beam.io.kafka import WriteToKafka from apache_beam.options.pipeline_options import DebugOptions from apache_beam.options.pipeline_options import PortableOptions from apache_beam.runners.portability import portable_runner_test @@ -42,7 +37,6 @@ from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to from apache_beam.transforms import userstate from apache_beam.transforms import window -from apache_beam.transforms.sql import SqlTransform from apache_beam.utils import timestamp # Run as @@ -240,79 +234,16 @@ class PrismRunnerTest(portable_runner_test.PortableRunnerTest): assert_that(lines, lambda lines: len(lines) > 0) def test_external_transform(self): - with self.create_pipeline() as p: - res = ( - p - | GenerateSequence( - start=1, stop=10, expansion_service=self.get_expansion_service())) - - assert_that(res, equal_to([i for i in range(1, 10)])) + raise unittest.SkipTest("Requires an expansion service to execute.") def test_expand_kafka_read(self): - # We expect to fail here because we do not have a Kafka cluster handy. - # Nevertheless, we check that the transform is expanded by the - # ExpansionService and that the pipeline fails during execution. - with self.assertRaises(Exception) as ctx: - self.enable_commit = True - with self.create_pipeline() as p: - # pylint: disable=expression-not-assigned - ( - p - | ReadFromKafka( - consumer_config={ - 'bootstrap.servers': 'notvalid1:7777, notvalid2:3531', - 'group.id': 'any_group' - }, - topics=['topic1', 'topic2'], - key_deserializer='org.apache.kafka.' - 'common.serialization.' - 'ByteArrayDeserializer', - value_deserializer='org.apache.kafka.' - 'common.serialization.' - 'LongDeserializer', - commit_offset_in_finalize=True, - timestamp_policy=ReadFromKafka.create_time_policy, - expansion_service=self.get_expansion_service())) - self.assertTrue( - 'No resolvable bootstrap urls given in bootstrap.servers' in str( - ctx.exception), - 'Expected to fail due to invalid bootstrap.servers, but ' - 'failed due to:\n%s' % str(ctx.exception)) + raise unittest.SkipTest("Requires an expansion service to execute.") def test_expand_kafka_write(self): - # We just test the expansion but do not execute. - # pylint: disable=expression-not-assigned - ( - self.create_pipeline() - | Impulse() - | Map(lambda input: (1, input)) - | WriteToKafka( - producer_config={ - 'bootstrap.servers': 'localhost:9092, notvalid2:3531' - }, - topic='topic1', - key_serializer='org.apache.kafka.' - 'common.serialization.' - 'LongSerializer', - value_serializer='org.apache.kafka.' - 'common.serialization.' - 'ByteArraySerializer', - expansion_service=self.get_expansion_service())) + raise unittest.SkipTest("Requires an expansion service to execute.") def test_sql(self): - with self.create_pipeline() as p: - output = ( - p - | 'Create' >> beam.Create([Row(x, str(x)) for x in range(5)]) - | 'Sql' >> SqlTransform( - """SELECT col1, col2 || '*' || col2 as col2, - power(col1, 2) as col3 - FROM PCOLLECTION - """, - expansion_service=self.get_expansion_service())) - assert_that( - output, - equal_to([(x, '{x}*{x}'.format(x=x), x * x) for x in range(5)])) + raise unittest.SkipTest("Requires an expansion service to execute.") # Inherits all other tests.