This is an automated email from the ASF dual-hosted git repository.

johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e59f001fdb3 Feature/externalize dataflow xlang kafka tests (#27805)
e59f001fdb3 is described below

commit e59f001fdb3d5104cf7f82cdbe0b099e32c7b9c1
Author: johnjcasey <95318300+johnjca...@users.noreply.github.com>
AuthorDate: Tue Aug 15 11:30:18 2023 -0400

    Feature/externalize dataflow xlang kafka tests (#27805)
    
    * Update 2.50 release notes to include new Kafka topicPattern feature
    
    * Begin running Kafka xlang tests as part of dataflow test suites
    
    * Load the Kafka jar to enable running kafkaio tests.
    Fix linting errors
    
    * Update build dependency
    
    * Update dependency definitions
    
    * Configure kafka xlang test to check for running expansion service
    
    * fix lint issue & update expansion service to provide kafka by default
    
    * Take two on updating expansion service config
    
    * Have dataflow xlang kafka tests depend on hardcoded kafka instance
    
    * fix formatting
    
    * formatting
    
    * format
    
    * refactor IO Xlang to separate postcommit
    
    * update documentation
    
    * wire in xlang io postcommit to dataflow postcommits
    
    * split append failure exception message
    
    * revert README autoformat
    
    * Undo performance test autoformat
    
    * Move kafka configuration, fix README.md
    
    * fix module plugin
    
    * fix BeamModulePlugin
    
    * fix BeamModulePlugin
    
    * fix whitespace
---
 .test-infra/jenkins/README.md                      |  1 +
 ...tCommit_Python_CrossLanguage_IO_Dataflow.groovy | 55 ++++++++++++++++++++++
 .../org/apache/beam/gradle/BeamModulePlugin.groovy |  7 +++
 release/src/main/scripts/jenkins_jobs.txt          |  1 +
 .../io/external/xlang_kafkaio_it_test.py           | 53 +++++++++++++++++++--
 sdks/python/test-suites/dataflow/build.gradle      |  6 +++
 sdks/python/test-suites/dataflow/common.gradle     |  3 +-
 sdks/python/test-suites/xlang/build.gradle         | 13 +++++
 8 files changed, 133 insertions(+), 6 deletions(-)

diff --git a/.test-infra/jenkins/README.md b/.test-infra/jenkins/README.md
index 02cddfdc65c..aae5842d274 100644
--- a/.test-infra/jenkins/README.md
+++ b/.test-infra/jenkins/README.md
@@ -162,6 +162,7 @@ Beam Jenkins overview page: 
[link](https://ci-beam.apache.org/)
 | beam_PostCommit_Python_VR_Spark | 
[cron](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/), 
[phrase](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/) | 
`Run Python Spark ValidatesRunner` | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark)
 |
 | beam_PostCommit_Python_Xlang_Gcp_Direct | 
[cron](https://ci-beam.apache.org/job/beam_PostCommit_Python_Xlang_Gcp_Direct/),
 
[phrase](https://ci-beam.apache.org/job/beam_PostCommit_Python_Xlang_Gcp_Direct_PR/)
 | `Run Python_Xlang_Gcp_Direct PostCommit` | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_Xlang_Gcp_Direct/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_Xlang_Gcp_Direct/)
 |
 | beam_PostCommit_Python_Xlang_Gcp_Dataflow | 
[cron](https://ci-beam.apache.org/job/beam_PostCommit_Python_Xlang_Gcp_Dataflow/),
 
[phrase](https://ci-beam.apache.org/job/beam_PostCommit_Python_Xlang_Gcp_Dataflow_PR/)
 | `Run Python_Xlang_Gcp_Dataflow PostCommit` | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_Xlang_Gcp_Dataflow/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_Xlang_Gcp_Dataflow/)
 |
+| beam_PostCommit_Python_Xlang_IO_Dataflow | 
[cron](https://ci-beam.apache.org/job/beam_PostCommit_Python_Xlang_IO_Dataflow/),
 
[phrase](https://ci-beam.apache.org/job/beam_PostCommit_Python_Xlang_IO_Dataflow_PR/)
 | `Run Python_Xlang_IO_Dataflow PostCommit` | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_Xlang_IO_Dataflow/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_Xlang_IO_Dataflow/|
 | beam_PostCommit_Python38 | 
[cron](https://ci-beam.apache.org/job/beam_PostCommit_Python38), 
[phrase](https://ci-beam.apache.org/job/beam_PostCommit_Python38_PR/) | `Run 
Python 3.8 PostCommit` | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38)
 |
 | beam_PostCommit_Python39 | 
[cron](https://ci-beam.apache.org/job/beam_PostCommit_Python39), 
[phrase](https://ci-beam.apache.org/job/beam_PostCommit_Python39_PR/) | `Run 
Python 3.9 PostCommit` | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python39/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python39)
 |
 | beam_PostCommit_Python310 | 
[cron](https://ci-beam.apache.org/job/beam_PostCommit_Python310), 
[phrase](https://ci-beam.apache.org/job/beam_PostCommit_Python310_PR/) | `Run 
Python 3.10 PostCommit` | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python310/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python310)
 |
diff --git 
a/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_IO_Dataflow.groovy 
b/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_IO_Dataflow.groovy
new file mode 100644
index 00000000000..9a37e76cbde
--- /dev/null
+++ b/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_IO_Dataflow.groovy
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import CommonJobProperties as commonJobProperties
+import PostcommitJobBuilder
+
+
+import static 
PythonTestProperties.CROSS_LANGUAGE_VALIDATES_RUNNER_PYTHON_VERSIONS
+
+
+// This job runs end-to-end cross language GCP IO tests with DataflowRunner.
+// Collects tests with the @pytest.mark.uses_io_java_expansion_service 
decorator
+PostcommitJobBuilder.postCommitJob('beam_PostCommit_Python_Xlang_IO_Dataflow',
+    'Run Python_Xlang_IO_Dataflow PostCommit', 'Python_Xlang_IO_Dataflow 
(\"Run Python_Xlang_IO_Dataflow PostCommit\")', this) {
+      description('Runs end-to-end cross language non-GCP IO tests on the 
Dataflow runner.')
+
+
+      // Set common parameters.
+      commonJobProperties.setTopLevelMainJobProperties(delegate, 'master', 180)
+
+
+      // Publish all test results to Jenkins
+      publishers {
+        archiveJunit('**/pytest*.xml')
+      }
+
+
+      // Gradle goals for this job.
+
+      steps {
+        gradle {
+          rootBuildScriptDir(commonJobProperties.checkoutDir)
+          tasks(":sdks:python:test-suites:dataflow:ioCrossLanguagePostCommit")
+          commonJobProperties.setGradleSwitches(delegate)
+          switches("-PuseWheelDistribution")
+          
switches("-PkafkaBootstrapServer=10.128.0.40:9094,10.128.0.28:9094,10.128.0.165:9094")
+        }
+      }
+    }
diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index a79badee668..81b7a4f965a 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -331,6 +331,8 @@ class BeamModulePlugin implements Plugin<Project> {
     TaskProvider startJobServer
     // Job server cleanup task.
     TaskProvider cleanupJobServer
+    // any additional environment variables specific to the suite of tests
+    Map<String,String> additionalEnvs
   }
 
   // A class defining the configuration for CrossLanguageUsingJavaExpansion.
@@ -356,6 +358,8 @@ class BeamModulePlugin implements Plugin<Project> {
     String expansionProjectPath
     // Collect Python pipeline tests with this marker
     String collectMarker
+    // any additional environment variables to be exported
+    Map<String,String> additionalEnvs
   }
 
   // A class defining the configuration for CrossLanguageValidatesRunner.
@@ -2525,6 +2529,9 @@ class BeamModulePlugin implements Plugin<Project> {
           project.exec {
             environment "EXPANSION_JAR", expansionJar
             environment "EXPANSION_PORT", javaExpansionPort
+            for (envs in config.additionalEnvs){
+              environment envs.getKey(), envs.getValue()
+            }
             executable 'sh'
             args '-c', ". ${project.ext.envdir}/bin/activate && cd $pythonDir 
&& ./scripts/run_integration_test.sh $cmdArgs"
           }
diff --git a/release/src/main/scripts/jenkins_jobs.txt 
b/release/src/main/scripts/jenkins_jobs.txt
index 3cb83daa7ac..e85c8c3bfba 100644
--- a/release/src/main/scripts/jenkins_jobs.txt
+++ b/release/src/main/scripts/jenkins_jobs.txt
@@ -138,6 +138,7 @@ Run Python_Runners 
PreCommit,beam_PreCommit_Python_Runners_Phrase
 Run Python_Transforms PreCommit,beam_PreCommit_Python_Transforms_Phrase
 Run Python_Xlang_Gcp_Dataflow 
PostCommit,beam_PostCommit_Python_Xlang_Gcp_Dataflow_PR
 Run Python_Xlang_Gcp_Direct 
PostCommit,beam_PostCommit_Python_Xlang_Gcp_Direct_PR
+Run Python_Xlang_IO_Dataflow 
PostCommit,beam_PostCommit_Python_Xlang_IO_Dataflow_PR
 Run RAT PreCommit,beam_PreCommit_RAT_Phrase
 Run SQL PostCommit,beam_PostCommit_SQL_PR
 Run SQL PreCommit,beam_PreCommit_SQL_Phrase
diff --git a/sdks/python/apache_beam/io/external/xlang_kafkaio_it_test.py 
b/sdks/python/apache_beam/io/external/xlang_kafkaio_it_test.py
index edeb76aa0e7..a2f350e8cb7 100644
--- a/sdks/python/apache_beam/io/external/xlang_kafkaio_it_test.py
+++ b/sdks/python/apache_beam/io/external/xlang_kafkaio_it_test.py
@@ -28,6 +28,8 @@ import typing
 import unittest
 import uuid
 
+import pytest
+
 import apache_beam as beam
 from apache_beam.coders.coders import VarIntCoder
 from apache_beam.io.kafka import ReadFromKafka
@@ -110,11 +112,11 @@ class CrossLanguageKafkaIO(object):
     pipeline.run(False)
 
 
-@unittest.skipUnless(
-    os.environ.get('LOCAL_KAFKA_JAR'),
-    "LOCAL_KAFKA_JAR environment var is not provided.")
 class CrossLanguageKafkaIOTest(unittest.TestCase):
-  def test_kafkaio_populated_key(self):
+  @unittest.skipUnless(
+      os.environ.get('LOCAL_KAFKA_JAR'),
+      "LOCAL_KAFKA_JAR environment var is not provided.")
+  def test_local_kafkaio_populated_key(self):
     kafka_topic = 'xlang_kafkaio_test_populated_key_{}'.format(uuid.uuid4())
     local_kafka_jar = os.environ.get('LOCAL_KAFKA_JAR')
     with self.local_kafka_service(local_kafka_jar) as kafka_port:
@@ -126,7 +128,10 @@ class CrossLanguageKafkaIOTest(unittest.TestCase):
       self.run_kafka_write(pipeline_creator)
       self.run_kafka_read(pipeline_creator, b'key')
 
-  def test_kafkaio_null_key(self):
+  @unittest.skipUnless(
+      os.environ.get('LOCAL_KAFKA_JAR'),
+      "LOCAL_KAFKA_JAR environment var is not provided.")
+  def test_local_kafkaio_null_key(self):
     kafka_topic = 'xlang_kafkaio_test_null_key_{}'.format(uuid.uuid4())
     local_kafka_jar = os.environ.get('LOCAL_KAFKA_JAR')
     with self.local_kafka_service(local_kafka_jar) as kafka_port:
@@ -138,6 +143,44 @@ class CrossLanguageKafkaIOTest(unittest.TestCase):
       self.run_kafka_write(pipeline_creator)
       self.run_kafka_read(pipeline_creator, None)
 
+  @pytest.mark.uses_io_expansion_service
+  @unittest.skipUnless(
+      os.environ.get('EXPANSION_PORT'),
+      "EXPANSION_PORT environment var is not provided.")
+  @unittest.skipUnless(
+      os.environ.get('KAFKA_BOOTSTRAP_SERVER'),
+      "KAFKA_BOOTSTRAP_SERVER environment var is not provided.")
+  def test_hosted_kafkaio_populated_key(self):
+    kafka_topic = 'xlang_kafkaio_test_populated_key_{}'.format(uuid.uuid4())
+    bootstrap_servers = os.environ.get('KAFKA_BOOTSTRAP_SERVER')
+    pipeline_creator = CrossLanguageKafkaIO(
+        bootstrap_servers,
+        kafka_topic,
+        False,
+        'localhost:%s' % os.environ.get('EXPANSION_PORT'))
+
+    self.run_kafka_write(pipeline_creator)
+    self.run_kafka_read(pipeline_creator, b'key')
+
+  @pytest.mark.uses_io_expansion_service
+  @unittest.skipUnless(
+      os.environ.get('EXPANSION_PORT'),
+      "EXPANSION_PORT environment var is not provided.")
+  @unittest.skipUnless(
+      os.environ.get('KAFKA_BOOTSTRAP_SERVER'),
+      "KAFKA_BOOTSTRAP_SERVER environment var is not provided.")
+  def test_hosted_kafkaio_null_key(self):
+    kafka_topic = 'xlang_kafkaio_test_null_key_{}'.format(uuid.uuid4())
+    bootstrap_servers = os.environ.get('KAFKA_BOOTSTRAP_SERVER')
+    pipeline_creator = CrossLanguageKafkaIO(
+        bootstrap_servers,
+        kafka_topic,
+        True,
+        'localhost:%s' % os.environ.get('EXPANSION_PORT'))
+
+    self.run_kafka_write(pipeline_creator)
+    self.run_kafka_read(pipeline_creator, None)
+
   def run_kafka_write(self, pipeline_creator):
     with TestPipeline() as pipeline:
       pipeline.not_use_test_runner_api = True
diff --git a/sdks/python/test-suites/dataflow/build.gradle 
b/sdks/python/test-suites/dataflow/build.gradle
index 08f03e207f3..b55716a42df 100644
--- a/sdks/python/test-suites/dataflow/build.gradle
+++ b/sdks/python/test-suites/dataflow/build.gradle
@@ -72,6 +72,12 @@ task gcpCrossLanguagePostCommit {
   }
 }
 
+task ioCrossLanguagePostCommit {
+  getVersionsAsList('cross_language_validates_gcp_py_versions').each {
+    
dependsOn.add(":sdks:python:test-suites:dataflow:py${getVersionSuffix(it)}:ioCrossLanguagePythonUsingJava")
+  }
+}
+
 task tftTests {
   getVersionsAsList('dataflow_cloudml_benchmark_tests_py_versions').each {
     
dependsOn.add(":sdks:python:test-suites:dataflow:py${getVersionSuffix(it)}:tftTests")
diff --git a/sdks/python/test-suites/dataflow/common.gradle 
b/sdks/python/test-suites/dataflow/common.gradle
index 477e2830b4c..eee6e9d2188 100644
--- a/sdks/python/test-suites/dataflow/common.gradle
+++ b/sdks/python/test-suites/dataflow/common.gradle
@@ -493,6 +493,7 @@ 
project(":sdks:python:test-suites:xlang").ext.xlangTasks.each { taskMetadata ->
         
"--sdk_container_image=gcr.io/apache-beam-testing/beam-sdk/beam_python${project.ext.pythonVersion}_sdk:latest",
         
"--sdk_harness_container_image_overrides=.*java.*,gcr.io/apache-beam-testing/beam-sdk/beam_java8_sdk:latest"
       ],
-      pytestOptions: basicPytestOpts
+      pytestOptions: basicPytestOpts,
+      additionalEnvs: taskMetadata.additionalEnvs
     )
 }
diff --git a/sdks/python/test-suites/xlang/build.gradle 
b/sdks/python/test-suites/xlang/build.gradle
index ea407ac6f3f..df3ebdd1582 100644
--- a/sdks/python/test-suites/xlang/build.gradle
+++ b/sdks/python/test-suites/xlang/build.gradle
@@ -58,5 +58,18 @@ def gcpXlangCommon = new CrossLanguageTaskCommon().tap {
 }
 xlangTasks.add(gcpXlangCommon)
 
+def ioExpansionProject = project.project(':sdks:java:io:expansion-service')
+
+def ioXlangCommon = new CrossLanguageTaskCommon().tap {
+    name = "ioCrossLanguage"
+    expansionProjectPath = ioExpansionProject.getPath()
+    collectMarker = "uses_io_expansion_service"
+    startJobServer = setupTask
+    cleanupJobServer = cleanupTask
+    //See .test-infra/kafka/bitnami/README.md for setup instructions
+    additionalEnvs = 
["KAFKA_BOOTSTRAP_SERVER":project.findProperty('kafkaBootstrapServer')]
+}
+
+xlangTasks.add(ioXlangCommon)
 
 ext.xlangTasks = xlangTasks
\ No newline at end of file

Reply via email to