This is an automated email from the ASF dual-hosted git repository. johncasey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new e59f001fdb3 Feature/externalize dataflow xlang kafka tests (#27805) e59f001fdb3 is described below commit e59f001fdb3d5104cf7f82cdbe0b099e32c7b9c1 Author: johnjcasey <95318300+johnjca...@users.noreply.github.com> AuthorDate: Tue Aug 15 11:30:18 2023 -0400 Feature/externalize dataflow xlang kafka tests (#27805) * Update 2.50 release notes to include new Kafka topicPattern feature * Begin running Kafka xlang tests as part of dataflow test suites * Load the Kafka jar to enable running kafkaio tests. Fix linting errors * Update build dependency * Update dependency definitions * Configure kafka xlang test to check for running expansion service * fix lint issue & update expansion service to provide kafka by default * Take two on updating expansion service config * Have dataflow xlang kafka tests depend on hardcoded kafka instance * fix formatting * formatting * format * refactor IO Xlang to separate postcommit * update documentation * wire in xlang io postcommit to dataflow postcommits * split append failure exception message * revert README autoformat * Undo performance test autoformat * Move kafka configuration, fix README.md * fix module plugin * fix BeamModulePlugin * fix BeamModulePlugin * fix whitespace --- .test-infra/jenkins/README.md | 1 + ...tCommit_Python_CrossLanguage_IO_Dataflow.groovy | 55 ++++++++++++++++++++++ .../org/apache/beam/gradle/BeamModulePlugin.groovy | 7 +++ release/src/main/scripts/jenkins_jobs.txt | 1 + .../io/external/xlang_kafkaio_it_test.py | 53 +++++++++++++++++++-- sdks/python/test-suites/dataflow/build.gradle | 6 +++ sdks/python/test-suites/dataflow/common.gradle | 3 +- sdks/python/test-suites/xlang/build.gradle | 13 +++++ 8 files changed, 133 insertions(+), 6 deletions(-) diff --git a/.test-infra/jenkins/README.md b/.test-infra/jenkins/README.md index 02cddfdc65c..aae5842d274 100644 --- a/.test-infra/jenkins/README.md +++ b/.test-infra/jenkins/README.md @@ -162,6 +162,7 @@ Beam Jenkins overview page: [link](https://ci-beam.apache.org/) | beam_PostCommit_Python_VR_Spark | [cron](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/), [phrase](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/) | `Run Python Spark ValidatesRunner` | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark) | | beam_PostCommit_Python_Xlang_Gcp_Direct | [cron](https://ci-beam.apache.org/job/beam_PostCommit_Python_Xlang_Gcp_Direct/), [phrase](https://ci-beam.apache.org/job/beam_PostCommit_Python_Xlang_Gcp_Direct_PR/) | `Run Python_Xlang_Gcp_Direct PostCommit` | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_Xlang_Gcp_Direct/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_Xlang_Gcp_Direct/) | | beam_PostCommit_Python_Xlang_Gcp_Dataflow | [cron](https://ci-beam.apache.org/job/beam_PostCommit_Python_Xlang_Gcp_Dataflow/), [phrase](https://ci-beam.apache.org/job/beam_PostCommit_Python_Xlang_Gcp_Dataflow_PR/) | `Run Python_Xlang_Gcp_Dataflow PostCommit` | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_Xlang_Gcp_Dataflow/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_Xlang_Gcp_Dataflow/) | +| beam_PostCommit_Python_Xlang_IO_Dataflow | [cron](https://ci-beam.apache.org/job/beam_PostCommit_Python_Xlang_IO_Dataflow/), [phrase](https://ci-beam.apache.org/job/beam_PostCommit_Python_Xlang_IO_Dataflow_PR/) | `Run Python_Xlang_IO_Dataflow PostCommit` | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_Xlang_IO_Dataflow/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_Xlang_IO_Dataflow/| | beam_PostCommit_Python38 | [cron](https://ci-beam.apache.org/job/beam_PostCommit_Python38), [phrase](https://ci-beam.apache.org/job/beam_PostCommit_Python38_PR/) | `Run Python 3.8 PostCommit` | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38) | | beam_PostCommit_Python39 | [cron](https://ci-beam.apache.org/job/beam_PostCommit_Python39), [phrase](https://ci-beam.apache.org/job/beam_PostCommit_Python39_PR/) | `Run Python 3.9 PostCommit` | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python39/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python39) | | beam_PostCommit_Python310 | [cron](https://ci-beam.apache.org/job/beam_PostCommit_Python310), [phrase](https://ci-beam.apache.org/job/beam_PostCommit_Python310_PR/) | `Run Python 3.10 PostCommit` | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python310/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python310) | diff --git a/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_IO_Dataflow.groovy b/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_IO_Dataflow.groovy new file mode 100644 index 00000000000..9a37e76cbde --- /dev/null +++ b/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_IO_Dataflow.groovy @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +import CommonJobProperties as commonJobProperties +import PostcommitJobBuilder + + +import static PythonTestProperties.CROSS_LANGUAGE_VALIDATES_RUNNER_PYTHON_VERSIONS + + +// This job runs end-to-end cross language GCP IO tests with DataflowRunner. +// Collects tests with the @pytest.mark.uses_io_java_expansion_service decorator +PostcommitJobBuilder.postCommitJob('beam_PostCommit_Python_Xlang_IO_Dataflow', + 'Run Python_Xlang_IO_Dataflow PostCommit', 'Python_Xlang_IO_Dataflow (\"Run Python_Xlang_IO_Dataflow PostCommit\")', this) { + description('Runs end-to-end cross language non-GCP IO tests on the Dataflow runner.') + + + // Set common parameters. + commonJobProperties.setTopLevelMainJobProperties(delegate, 'master', 180) + + + // Publish all test results to Jenkins + publishers { + archiveJunit('**/pytest*.xml') + } + + + // Gradle goals for this job. + + steps { + gradle { + rootBuildScriptDir(commonJobProperties.checkoutDir) + tasks(":sdks:python:test-suites:dataflow:ioCrossLanguagePostCommit") + commonJobProperties.setGradleSwitches(delegate) + switches("-PuseWheelDistribution") + switches("-PkafkaBootstrapServer=10.128.0.40:9094,10.128.0.28:9094,10.128.0.165:9094") + } + } + } diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index a79badee668..81b7a4f965a 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -331,6 +331,8 @@ class BeamModulePlugin implements Plugin<Project> { TaskProvider startJobServer // Job server cleanup task. TaskProvider cleanupJobServer + // any additional environment variables specific to the suite of tests + Map<String,String> additionalEnvs } // A class defining the configuration for CrossLanguageUsingJavaExpansion. @@ -356,6 +358,8 @@ class BeamModulePlugin implements Plugin<Project> { String expansionProjectPath // Collect Python pipeline tests with this marker String collectMarker + // any additional environment variables to be exported + Map<String,String> additionalEnvs } // A class defining the configuration for CrossLanguageValidatesRunner. @@ -2525,6 +2529,9 @@ class BeamModulePlugin implements Plugin<Project> { project.exec { environment "EXPANSION_JAR", expansionJar environment "EXPANSION_PORT", javaExpansionPort + for (envs in config.additionalEnvs){ + environment envs.getKey(), envs.getValue() + } executable 'sh' args '-c', ". ${project.ext.envdir}/bin/activate && cd $pythonDir && ./scripts/run_integration_test.sh $cmdArgs" } diff --git a/release/src/main/scripts/jenkins_jobs.txt b/release/src/main/scripts/jenkins_jobs.txt index 3cb83daa7ac..e85c8c3bfba 100644 --- a/release/src/main/scripts/jenkins_jobs.txt +++ b/release/src/main/scripts/jenkins_jobs.txt @@ -138,6 +138,7 @@ Run Python_Runners PreCommit,beam_PreCommit_Python_Runners_Phrase Run Python_Transforms PreCommit,beam_PreCommit_Python_Transforms_Phrase Run Python_Xlang_Gcp_Dataflow PostCommit,beam_PostCommit_Python_Xlang_Gcp_Dataflow_PR Run Python_Xlang_Gcp_Direct PostCommit,beam_PostCommit_Python_Xlang_Gcp_Direct_PR +Run Python_Xlang_IO_Dataflow PostCommit,beam_PostCommit_Python_Xlang_IO_Dataflow_PR Run RAT PreCommit,beam_PreCommit_RAT_Phrase Run SQL PostCommit,beam_PostCommit_SQL_PR Run SQL PreCommit,beam_PreCommit_SQL_Phrase diff --git a/sdks/python/apache_beam/io/external/xlang_kafkaio_it_test.py b/sdks/python/apache_beam/io/external/xlang_kafkaio_it_test.py index edeb76aa0e7..a2f350e8cb7 100644 --- a/sdks/python/apache_beam/io/external/xlang_kafkaio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_kafkaio_it_test.py @@ -28,6 +28,8 @@ import typing import unittest import uuid +import pytest + import apache_beam as beam from apache_beam.coders.coders import VarIntCoder from apache_beam.io.kafka import ReadFromKafka @@ -110,11 +112,11 @@ class CrossLanguageKafkaIO(object): pipeline.run(False) -@unittest.skipUnless( - os.environ.get('LOCAL_KAFKA_JAR'), - "LOCAL_KAFKA_JAR environment var is not provided.") class CrossLanguageKafkaIOTest(unittest.TestCase): - def test_kafkaio_populated_key(self): + @unittest.skipUnless( + os.environ.get('LOCAL_KAFKA_JAR'), + "LOCAL_KAFKA_JAR environment var is not provided.") + def test_local_kafkaio_populated_key(self): kafka_topic = 'xlang_kafkaio_test_populated_key_{}'.format(uuid.uuid4()) local_kafka_jar = os.environ.get('LOCAL_KAFKA_JAR') with self.local_kafka_service(local_kafka_jar) as kafka_port: @@ -126,7 +128,10 @@ class CrossLanguageKafkaIOTest(unittest.TestCase): self.run_kafka_write(pipeline_creator) self.run_kafka_read(pipeline_creator, b'key') - def test_kafkaio_null_key(self): + @unittest.skipUnless( + os.environ.get('LOCAL_KAFKA_JAR'), + "LOCAL_KAFKA_JAR environment var is not provided.") + def test_local_kafkaio_null_key(self): kafka_topic = 'xlang_kafkaio_test_null_key_{}'.format(uuid.uuid4()) local_kafka_jar = os.environ.get('LOCAL_KAFKA_JAR') with self.local_kafka_service(local_kafka_jar) as kafka_port: @@ -138,6 +143,44 @@ class CrossLanguageKafkaIOTest(unittest.TestCase): self.run_kafka_write(pipeline_creator) self.run_kafka_read(pipeline_creator, None) + @pytest.mark.uses_io_expansion_service + @unittest.skipUnless( + os.environ.get('EXPANSION_PORT'), + "EXPANSION_PORT environment var is not provided.") + @unittest.skipUnless( + os.environ.get('KAFKA_BOOTSTRAP_SERVER'), + "KAFKA_BOOTSTRAP_SERVER environment var is not provided.") + def test_hosted_kafkaio_populated_key(self): + kafka_topic = 'xlang_kafkaio_test_populated_key_{}'.format(uuid.uuid4()) + bootstrap_servers = os.environ.get('KAFKA_BOOTSTRAP_SERVER') + pipeline_creator = CrossLanguageKafkaIO( + bootstrap_servers, + kafka_topic, + False, + 'localhost:%s' % os.environ.get('EXPANSION_PORT')) + + self.run_kafka_write(pipeline_creator) + self.run_kafka_read(pipeline_creator, b'key') + + @pytest.mark.uses_io_expansion_service + @unittest.skipUnless( + os.environ.get('EXPANSION_PORT'), + "EXPANSION_PORT environment var is not provided.") + @unittest.skipUnless( + os.environ.get('KAFKA_BOOTSTRAP_SERVER'), + "KAFKA_BOOTSTRAP_SERVER environment var is not provided.") + def test_hosted_kafkaio_null_key(self): + kafka_topic = 'xlang_kafkaio_test_null_key_{}'.format(uuid.uuid4()) + bootstrap_servers = os.environ.get('KAFKA_BOOTSTRAP_SERVER') + pipeline_creator = CrossLanguageKafkaIO( + bootstrap_servers, + kafka_topic, + True, + 'localhost:%s' % os.environ.get('EXPANSION_PORT')) + + self.run_kafka_write(pipeline_creator) + self.run_kafka_read(pipeline_creator, None) + def run_kafka_write(self, pipeline_creator): with TestPipeline() as pipeline: pipeline.not_use_test_runner_api = True diff --git a/sdks/python/test-suites/dataflow/build.gradle b/sdks/python/test-suites/dataflow/build.gradle index 08f03e207f3..b55716a42df 100644 --- a/sdks/python/test-suites/dataflow/build.gradle +++ b/sdks/python/test-suites/dataflow/build.gradle @@ -72,6 +72,12 @@ task gcpCrossLanguagePostCommit { } } +task ioCrossLanguagePostCommit { + getVersionsAsList('cross_language_validates_gcp_py_versions').each { + dependsOn.add(":sdks:python:test-suites:dataflow:py${getVersionSuffix(it)}:ioCrossLanguagePythonUsingJava") + } +} + task tftTests { getVersionsAsList('dataflow_cloudml_benchmark_tests_py_versions').each { dependsOn.add(":sdks:python:test-suites:dataflow:py${getVersionSuffix(it)}:tftTests") diff --git a/sdks/python/test-suites/dataflow/common.gradle b/sdks/python/test-suites/dataflow/common.gradle index 477e2830b4c..eee6e9d2188 100644 --- a/sdks/python/test-suites/dataflow/common.gradle +++ b/sdks/python/test-suites/dataflow/common.gradle @@ -493,6 +493,7 @@ project(":sdks:python:test-suites:xlang").ext.xlangTasks.each { taskMetadata -> "--sdk_container_image=gcr.io/apache-beam-testing/beam-sdk/beam_python${project.ext.pythonVersion}_sdk:latest", "--sdk_harness_container_image_overrides=.*java.*,gcr.io/apache-beam-testing/beam-sdk/beam_java8_sdk:latest" ], - pytestOptions: basicPytestOpts + pytestOptions: basicPytestOpts, + additionalEnvs: taskMetadata.additionalEnvs ) } diff --git a/sdks/python/test-suites/xlang/build.gradle b/sdks/python/test-suites/xlang/build.gradle index ea407ac6f3f..df3ebdd1582 100644 --- a/sdks/python/test-suites/xlang/build.gradle +++ b/sdks/python/test-suites/xlang/build.gradle @@ -58,5 +58,18 @@ def gcpXlangCommon = new CrossLanguageTaskCommon().tap { } xlangTasks.add(gcpXlangCommon) +def ioExpansionProject = project.project(':sdks:java:io:expansion-service') + +def ioXlangCommon = new CrossLanguageTaskCommon().tap { + name = "ioCrossLanguage" + expansionProjectPath = ioExpansionProject.getPath() + collectMarker = "uses_io_expansion_service" + startJobServer = setupTask + cleanupJobServer = cleanupTask + //See .test-infra/kafka/bitnami/README.md for setup instructions + additionalEnvs = ["KAFKA_BOOTSTRAP_SERVER":project.findProperty('kafkaBootstrapServer')] +} + +xlangTasks.add(ioXlangCommon) ext.xlangTasks = xlangTasks \ No newline at end of file