This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 8e71e59eae495ab74c5c20f50456a5c9c5416905
Author: Luke Cwik <lc...@google.com>
AuthorDate: Wed Nov 27 10:39:12 2019 -0800

    [BEAM-2929] Ensure that the Beam Java SDK sends the property 
"use_indexed_format" to Dataflow for side inputs which use a multimap 
materialization.
---
 .../dataflow/DataflowPipelineTranslator.java       |  8 ++-
 .../dataflow/DataflowPipelineTranslatorTest.java   | 62 +++++++++++++++++++++-
 2 files changed, 67 insertions(+), 3 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 41e5cbb..45e0c33 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -83,6 +83,7 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.Materializations;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
@@ -710,8 +711,11 @@ public class DataflowPipelineTranslator {
       String generatedName = String.format("%s.out%d", stepName, 
outputInfoList.size());
 
       addString(outputInfo, PropertyNames.USER_NAME, generatedName);
-      if (value instanceof PCollection
-          && 
translator.runner.doesPCollectionRequireIndexedFormat((PCollection<?>) value)) {
+      if ((value instanceof PCollection
+              && 
translator.runner.doesPCollectionRequireIndexedFormat((PCollection<?>) value))
+          || ((value instanceof PCollectionView)
+              && (Materializations.MULTIMAP_MATERIALIZATION_URN.equals(
+                  ((PCollectionView) 
value).getViewFn().getMaterialization().getUrn())))) {
         addBoolean(outputInfo, PropertyNames.USE_INDEXED_FORMAT, true);
       }
       if (valueCoder != null) {
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 85b1e22..c5c3ddc 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -736,8 +736,8 @@ public class DataflowPipelineTranslatorTest implements 
Serializable {
   @Test
   public void testSplittableParDoTranslationFnApi() throws Exception {
     DataflowPipelineOptions options = buildPipelineOptions();
-    DataflowRunner runner = DataflowRunner.fromOptions(options);
     options.setExperiments(Arrays.asList("beam_fn_api"));
+    DataflowRunner runner = DataflowRunner.fromOptions(options);
     DataflowPipelineTranslator translator = 
DataflowPipelineTranslator.fromOptions(options);
 
     Pipeline pipeline = Pipeline.create(options);
@@ -852,6 +852,66 @@ public class DataflowPipelineTranslatorTest implements 
Serializable {
   }
 
   @Test
+  public void testToSingletonTranslationWithFnApiSideInput() throws Exception {
+    // A "change detector" test that makes sure the translation
+    // of getting a PCollectionView<T> does not change
+    // in bad ways during refactor
+
+    DataflowPipelineOptions options = buildPipelineOptions();
+    options.setExperiments(Arrays.asList("beam_fn_api"));
+    DataflowPipelineTranslator translator = 
DataflowPipelineTranslator.fromOptions(options);
+
+    Pipeline pipeline = Pipeline.create(options);
+    pipeline.apply(Create.of(1)).apply(View.asSingleton());
+    DataflowRunner runner = DataflowRunner.fromOptions(options);
+    runner.replaceTransforms(pipeline);
+    Job job = translator.translate(pipeline, runner, 
Collections.emptyList()).getJob();
+    assertAllStepOutputsHaveUniqueIds(job);
+
+    List<Step> steps = job.getSteps();
+    assertEquals(14, steps.size());
+
+    Step collectionToSingletonStep = steps.get(steps.size() - 1);
+    assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
+
+    @SuppressWarnings("unchecked")
+    List<Map<String, Object>> ctsOutputs =
+        (List<Map<String, Object>>)
+            steps.get(steps.size() - 
1).getProperties().get(PropertyNames.OUTPUT_INFO);
+    assertTrue(Structs.getBoolean(Iterables.getOnlyElement(ctsOutputs), 
"use_indexed_format"));
+  }
+
+  @Test
+  public void testToIterableTranslationWithFnApiSideInput() throws Exception {
+    // A "change detector" test that makes sure the translation
+    // of getting a PCollectionView<Iterable<T>> does not change
+    // in bad ways during refactor
+
+    DataflowPipelineOptions options = buildPipelineOptions();
+    options.setExperiments(Arrays.asList("beam_fn_api"));
+    DataflowPipelineTranslator translator = 
DataflowPipelineTranslator.fromOptions(options);
+
+    Pipeline pipeline = Pipeline.create(options);
+    pipeline.apply(Create.of(1, 2, 3)).apply(View.asIterable());
+
+    DataflowRunner runner = DataflowRunner.fromOptions(options);
+    runner.replaceTransforms(pipeline);
+    Job job = translator.translate(pipeline, runner, 
Collections.emptyList()).getJob();
+    assertAllStepOutputsHaveUniqueIds(job);
+
+    List<Step> steps = job.getSteps();
+    assertEquals(10, steps.size());
+
+    @SuppressWarnings("unchecked")
+    List<Map<String, Object>> ctsOutputs =
+        (List<Map<String, Object>>)
+            steps.get(steps.size() - 
1).getProperties().get(PropertyNames.OUTPUT_INFO);
+    assertTrue(Structs.getBoolean(Iterables.getOnlyElement(ctsOutputs), 
"use_indexed_format"));
+    Step collectionToSingletonStep = steps.get(steps.size() - 1);
+    assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
+  }
+
+  @Test
   public void testStepDisplayData() throws Exception {
     DataflowPipelineOptions options = buildPipelineOptions();
     DataflowPipelineTranslator translator = 
DataflowPipelineTranslator.fromOptions(options);

Reply via email to