chamikaramj commented on code in PR #17674:
URL: https://github.com/apache/beam/pull/17674#discussion_r876198486
##########
buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy:
##########
@@ -2283,6 +2275,7 @@ class BeamModulePlugin implements Plugin<Project> {
"java_expansion_service_jar": expansionJar,
"java_port": javaPort,
"java_expansion_service_allowlist_file": javaClassLookupAllowlistFile,
+ "python_expansion_service_fqn_glob": "\\*",
Review Comment:
"python_expansion_service_allowlist" ?
##########
sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/DataframeTransformTest.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.python;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.UsesPythonExpansionService;
+import org.apache.beam.sdk.testing.ValidatesRunner;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.util.PythonCallableSource;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ConnectivityState;
+import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ManagedChannel;
+import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ManagedChannelBuilder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class DataframeTransformTest {
+ @Rule public transient TestPipeline testPipeline = TestPipeline.create();
+ private PipelineResult pipelineResult;
+ private static String expansionAddr;
+
+ @BeforeClass
+ public static void setUpClass() {
Review Comment:
Can these setup/teardown methods be moved to a common class instead of
repeating in all Java x-lang tests ?
##########
examples/java/src/main/java/org/apache/beam/examples/multilang/PythonDataframeWordCount.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
Review Comment:
Can we add this to "examples/multi-language" instead ?
https://github.com/apache/beam/tree/master/examples/multi-language
##########
buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy:
##########
@@ -351,15 +351,6 @@ class BeamModulePlugin implements Plugin<Project> {
Integer numParallelTests = 1
// Whether the pipeline needs --sdk_location option
boolean needsSdkLocation = false
- // Categories for Java tests to run.
- Closure javaTestCategories = {
- includeCategories
'org.apache.beam.sdk.testing.UsesCrossLanguageTransforms'
- // Use the following to include / exclude categories:
Review Comment:
Should this comment be removed ?
##########
examples/java/src/main/java/org/apache/beam/examples/multilang/PythonDataframeWordCount.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.multilang;
+
+import org.apache.beam.examples.common.ExampleUtils;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.python.PythonExternalTransform;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation.Required;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.util.PythonCallableSource;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * An example that counts words in Shakespeare and utilizes a Python external
transform.
+ *
+ * <p>This class, {@link PythonDataframeWordCount}, uses Python
DataframeTransform to count words
+ * from the input text file. The Python expansion service provided by
--expansionService must allow
+ * the expansion of apache_beam.dataframe.transforms.DataframeTransform (which
can be done by
+ * passing --fully_qualified_name_glob commandline option when launching the
expansion service).
+ *
+ * <p>Note that, for using Dataflow Runner, you should specify the following
two additional
+ * arguments:
+ *
+ * <pre>{@code
+ * --experiments=use_runner_v2
+ *
--sdkHarnessContainerImageOverrides=.*python.*,gcr.io/apache-beam-testing/beam-sdk/beam_python3.8_sdk:latest
Review Comment:
In the following examples, I simplified by providing instructions for
running with the latest released Beam version. Can we do the same here ? Users
should be able to just run this example without additional setup (no need to
startup an expansion service or push containers for example).
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/kafkataxi/README.md
https://github.com/apache/beam/tree/master/examples/multi-language
##########
sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/DataframeTransformTest.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.python;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.UsesPythonExpansionService;
+import org.apache.beam.sdk.testing.ValidatesRunner;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.util.PythonCallableSource;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ConnectivityState;
+import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ManagedChannel;
+import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ManagedChannelBuilder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class DataframeTransformTest {
+ @Rule public transient TestPipeline testPipeline = TestPipeline.create();
+ private PipelineResult pipelineResult;
+ private static String expansionAddr;
+
+ @BeforeClass
+ public static void setUpClass() {
+ expansionAddr =
+ String.format("localhost:%s",
Integer.valueOf(System.getProperty("expansionPort")));
+ }
+
+ @Before
+ public void setUp() {
+ waitForReady();
+ }
+
+ @After
+ public void tearDown() {
+ pipelineResult = testPipeline.run();
+ pipelineResult.waitUntilFinish();
+ assertThat(pipelineResult.getState(), equalTo(PipelineResult.State.DONE));
+ }
+
+ private void waitForReady() {
+ try {
+ ManagedChannel channel =
ManagedChannelBuilder.forTarget(expansionAddr).build();
+ ConnectivityState state = channel.getState(true);
+ for (int retry = 0; retry < 30 && state != ConnectivityState.READY;
retry++) {
+ Thread.sleep(500);
+ state = channel.getState(true);
+ }
+ channel.shutdownNow();
+ } catch (InterruptedException e) {
+ throw new RuntimeException("interrupted.");
+ }
+ }
+
+ @Test
+ @Category({ValidatesRunner.class, UsesPythonExpansionService.class})
+ public void testDataframeSum() {
+ Schema schema =
+ Schema.of(
+ Schema.Field.of("a", Schema.FieldType.INT64),
+ Schema.Field.of("b", Schema.FieldType.INT32));
+ Row foo1 = Row.withSchema(schema).withFieldValue("a",
100L).withFieldValue("b", 1).build();
+ Row foo2 = Row.withSchema(schema).withFieldValue("a",
100L).withFieldValue("b", 2).build();
+ Row foo3 = Row.withSchema(schema).withFieldValue("a",
100L).withFieldValue("b", 3).build();
+ Row bar4 = Row.withSchema(schema).withFieldValue("a",
200L).withFieldValue("b", 4).build();
+ PCollection<Row> col =
+ testPipeline
+ .apply(Create.of(foo1, foo2, bar4))
+ .setRowSchema(schema)
+ .apply(
+ PythonExternalTransform.<PCollection<Row>,
PCollection<Row>>from(
Review Comment:
I think we should add a Java wrapper to simplify this for users (even though
that might end up being a small class).
##########
examples/java/src/main/java/org/apache/beam/examples/multilang/PythonDataframeWordCount.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.multilang;
+
+import org.apache.beam.examples.common.ExampleUtils;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.python.PythonExternalTransform;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation.Required;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.util.PythonCallableSource;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * An example that counts words in Shakespeare and utilizes a Python external
transform.
+ *
+ * <p>This class, {@link PythonDataframeWordCount}, uses Python
DataframeTransform to count words
+ * from the input text file. The Python expansion service provided by
--expansionService must allow
+ * the expansion of apache_beam.dataframe.transforms.DataframeTransform (which
can be done by
+ * passing --fully_qualified_name_glob commandline option when launching the
expansion service).
+ *
+ * <p>Note that, for using Dataflow Runner, you should specify the following
two additional
+ * arguments:
+ *
+ * <pre>{@code
+ * --experiments=use_runner_v2
+ *
--sdkHarnessContainerImageOverrides=.*python.*,gcr.io/apache-beam-testing/beam-sdk/beam_python3.8_sdk:latest
+ * }</pre>
+ */
+public class PythonDataframeWordCount {
+
+ // Extract the words and create the rows for counting.
+ static class ExtractWordsFn extends DoFn<String, Row> {
+ public static final Schema SCHEMA =
+ Schema.of(
+ Schema.Field.of("word", Schema.FieldType.STRING),
+ Schema.Field.of("count", Schema.FieldType.INT32));
+ private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class,
"emptyLines");
+ private final Distribution lineLenDist =
+ Metrics.distribution(ExtractWordsFn.class, "lineLenDistro");
+
+ @ProcessElement
+ public void processElement(@Element String element, OutputReceiver<Row>
receiver) {
+ lineLenDist.update(element.length());
+ if (element.trim().isEmpty()) {
+ emptyLines.inc();
+ }
+
+ // Split the line into words.
+ String[] words = element.split(ExampleUtils.TOKENIZER_PATTERN, -1);
+
+ // Output each word encountered into the output PCollection.
+ for (String word : words) {
+ if (!word.isEmpty()) {
+ receiver.output(
+ Row.withSchema(SCHEMA)
+ .withFieldValue("word", word)
+ .withFieldValue("count", 1)
+ .build());
+ }
+ }
+ }
+ }
+
+ /** A SimpleFunction that converts a counted row into a printable string. */
+ public static class FormatAsTextFn extends SimpleFunction<Row, String> {
+ @Override
+ public String apply(Row input) {
+ return input.getString("word") + ": " + input.getInt32("count");
+ }
+ }
+
+ /** Options supported by {@link PythonDataframeWordCount}. */
+ public interface WordCountOptions extends PipelineOptions {
+
+ /**
+ * By default, this example reads from a public dataset containing the
text of King Lear. Set
+ * this option to choose a different input file or glob.
+ */
+ @Description("Path of the file to read from")
+ @Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt")
+ String getInputFile();
+
+ void setInputFile(String value);
+
+ /** Set this required option to specify where to write the output. */
+ @Description("Path of the file to write to")
+ @Required
+ String getOutput();
+
+ void setOutput(String value);
+
+ /** Set this required option to specify Python expansion service URL. */
+ @Description("URL of Python expansion service")
+ @Required
Review Comment:
Please make this option so that the example can just worth with the default
expansion service.
##########
runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ValidateRunnerXlangTest.java:
##########
@@ -287,7 +287,11 @@ protected void pythonDependenciesTest(Pipeline pipeline) {
@RunWith(JUnit4.class)
public static class SingleInputOutputTest extends
ValidateRunnerXlangTestBase {
@Test
- @Category({ValidatesRunner.class, UsesCrossLanguageTransforms.class})
+ @Category({
+ ValidatesRunner.class,
Review Comment:
Is there value in running both Java and Python pipelines with Java and
Python expansion services ? I wonder if we can simplify (and hopefully reduce
flakiness) by running Java tests only with Python expansion service and vice
versa.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]