This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f1f5f775252 [prism] Skip python tests that require an expansion 
service at this time. (#32182)
f1f5f775252 is described below

commit f1f5f775252a50929a96388b91e4497051aaec91
Author: Robert Burke <lostl...@users.noreply.github.com>
AuthorDate: Wed Aug 14 15:39:29 2024 -0700

    [prism] Skip python tests that require an expansion service at this time. 
(#32182)
    
    * [prism] Skip tests that require an expansion service at this time.
    
    * Remove extra python imports.
    
    ---------
    
    Co-authored-by: lostluck <13907733+lostl...@users.noreply.github.com>
---
 .../runners/portability/prism_runner_test.py       | 77 ++--------------------
 1 file changed, 4 insertions(+), 73 deletions(-)

diff --git a/sdks/python/apache_beam/runners/portability/prism_runner_test.py 
b/sdks/python/apache_beam/runners/portability/prism_runner_test.py
index b179156877e..387b7ba2bec 100644
--- a/sdks/python/apache_beam/runners/portability/prism_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/prism_runner_test.py
@@ -30,11 +30,6 @@ from tempfile import mkdtemp
 import pytest
 
 import apache_beam as beam
-from apache_beam import Impulse
-from apache_beam import Map
-from apache_beam.io.external.generate_sequence import GenerateSequence
-from apache_beam.io.kafka import ReadFromKafka
-from apache_beam.io.kafka import WriteToKafka
 from apache_beam.options.pipeline_options import DebugOptions
 from apache_beam.options.pipeline_options import PortableOptions
 from apache_beam.runners.portability import portable_runner_test
@@ -42,7 +37,6 @@ from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
 from apache_beam.transforms import userstate
 from apache_beam.transforms import window
-from apache_beam.transforms.sql import SqlTransform
 from apache_beam.utils import timestamp
 
 # Run as
@@ -240,79 +234,16 @@ class 
PrismRunnerTest(portable_runner_test.PortableRunnerTest):
       assert_that(lines, lambda lines: len(lines) > 0)
 
   def test_external_transform(self):
-    with self.create_pipeline() as p:
-      res = (
-          p
-          | GenerateSequence(
-              start=1, stop=10, 
expansion_service=self.get_expansion_service()))
-
-      assert_that(res, equal_to([i for i in range(1, 10)]))
+    raise unittest.SkipTest("Requires an expansion service to execute.")
 
   def test_expand_kafka_read(self):
-    # We expect to fail here because we do not have a Kafka cluster handy.
-    # Nevertheless, we check that the transform is expanded by the
-    # ExpansionService and that the pipeline fails during execution.
-    with self.assertRaises(Exception) as ctx:
-      self.enable_commit = True
-      with self.create_pipeline() as p:
-        # pylint: disable=expression-not-assigned
-        (
-            p
-            | ReadFromKafka(
-                consumer_config={
-                    'bootstrap.servers': 'notvalid1:7777, notvalid2:3531',
-                    'group.id': 'any_group'
-                },
-                topics=['topic1', 'topic2'],
-                key_deserializer='org.apache.kafka.'
-                'common.serialization.'
-                'ByteArrayDeserializer',
-                value_deserializer='org.apache.kafka.'
-                'common.serialization.'
-                'LongDeserializer',
-                commit_offset_in_finalize=True,
-                timestamp_policy=ReadFromKafka.create_time_policy,
-                expansion_service=self.get_expansion_service()))
-    self.assertTrue(
-        'No resolvable bootstrap urls given in bootstrap.servers' in str(
-            ctx.exception),
-        'Expected to fail due to invalid bootstrap.servers, but '
-        'failed due to:\n%s' % str(ctx.exception))
+    raise unittest.SkipTest("Requires an expansion service to execute.")
 
   def test_expand_kafka_write(self):
-    # We just test the expansion but do not execute.
-    # pylint: disable=expression-not-assigned
-    (
-        self.create_pipeline()
-        | Impulse()
-        | Map(lambda input: (1, input))
-        | WriteToKafka(
-            producer_config={
-                'bootstrap.servers': 'localhost:9092, notvalid2:3531'
-            },
-            topic='topic1',
-            key_serializer='org.apache.kafka.'
-            'common.serialization.'
-            'LongSerializer',
-            value_serializer='org.apache.kafka.'
-            'common.serialization.'
-            'ByteArraySerializer',
-            expansion_service=self.get_expansion_service()))
+    raise unittest.SkipTest("Requires an expansion service to execute.")
 
   def test_sql(self):
-    with self.create_pipeline() as p:
-      output = (
-          p
-          | 'Create' >> beam.Create([Row(x, str(x)) for x in range(5)])
-          | 'Sql' >> SqlTransform(
-              """SELECT col1, col2 || '*' || col2 as col2,
-                    power(col1, 2) as col3
-             FROM PCOLLECTION
-          """,
-              expansion_service=self.get_expansion_service()))
-      assert_that(
-          output,
-          equal_to([(x, '{x}*{x}'.format(x=x), x * x) for x in range(5)]))
+    raise unittest.SkipTest("Requires an expansion service to execute.")
 
 
 # Inherits all other tests.

Reply via email to