This is an automated email from the ASF dual-hosted git repository. lcwik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
commit 8e71e59eae495ab74c5c20f50456a5c9c5416905 Author: Luke Cwik <lc...@google.com> AuthorDate: Wed Nov 27 10:39:12 2019 -0800 [BEAM-2929] Ensure that the Beam Java SDK sends the property "use_indexed_format" to Dataflow for side inputs which use a multimap materialization. --- .../dataflow/DataflowPipelineTranslator.java | 8 ++- .../dataflow/DataflowPipelineTranslatorTest.java | 62 +++++++++++++++++++++- 2 files changed, 67 insertions(+), 3 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 41e5cbb..45e0c33 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -83,6 +83,7 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnSchemaInformation; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.Materializations; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; @@ -710,8 +711,11 @@ public class DataflowPipelineTranslator { String generatedName = String.format("%s.out%d", stepName, outputInfoList.size()); addString(outputInfo, PropertyNames.USER_NAME, generatedName); - if (value instanceof PCollection - && translator.runner.doesPCollectionRequireIndexedFormat((PCollection<?>) value)) { + if ((value instanceof PCollection + && translator.runner.doesPCollectionRequireIndexedFormat((PCollection<?>) value)) + || ((value instanceof PCollectionView) + && (Materializations.MULTIMAP_MATERIALIZATION_URN.equals( + ((PCollectionView) value).getViewFn().getMaterialization().getUrn())))) { addBoolean(outputInfo, PropertyNames.USE_INDEXED_FORMAT, true); } if (valueCoder != null) { diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 85b1e22..c5c3ddc 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -736,8 +736,8 @@ public class DataflowPipelineTranslatorTest implements Serializable { @Test public void testSplittableParDoTranslationFnApi() throws Exception { DataflowPipelineOptions options = buildPipelineOptions(); - DataflowRunner runner = DataflowRunner.fromOptions(options); options.setExperiments(Arrays.asList("beam_fn_api")); + DataflowRunner runner = DataflowRunner.fromOptions(options); DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options); Pipeline pipeline = Pipeline.create(options); @@ -852,6 +852,66 @@ public class DataflowPipelineTranslatorTest implements Serializable { } @Test + public void testToSingletonTranslationWithFnApiSideInput() throws Exception { + // A "change detector" test that makes sure the translation + // of getting a PCollectionView<T> does not change + // in bad ways during refactor + + DataflowPipelineOptions options = buildPipelineOptions(); + options.setExperiments(Arrays.asList("beam_fn_api")); + DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options); + + Pipeline pipeline = Pipeline.create(options); + pipeline.apply(Create.of(1)).apply(View.asSingleton()); + DataflowRunner runner = DataflowRunner.fromOptions(options); + runner.replaceTransforms(pipeline); + Job job = translator.translate(pipeline, runner, Collections.emptyList()).getJob(); + assertAllStepOutputsHaveUniqueIds(job); + + List<Step> steps = job.getSteps(); + assertEquals(14, steps.size()); + + Step collectionToSingletonStep = steps.get(steps.size() - 1); + assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind()); + + @SuppressWarnings("unchecked") + List<Map<String, Object>> ctsOutputs = + (List<Map<String, Object>>) + steps.get(steps.size() - 1).getProperties().get(PropertyNames.OUTPUT_INFO); + assertTrue(Structs.getBoolean(Iterables.getOnlyElement(ctsOutputs), "use_indexed_format")); + } + + @Test + public void testToIterableTranslationWithFnApiSideInput() throws Exception { + // A "change detector" test that makes sure the translation + // of getting a PCollectionView<Iterable<T>> does not change + // in bad ways during refactor + + DataflowPipelineOptions options = buildPipelineOptions(); + options.setExperiments(Arrays.asList("beam_fn_api")); + DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options); + + Pipeline pipeline = Pipeline.create(options); + pipeline.apply(Create.of(1, 2, 3)).apply(View.asIterable()); + + DataflowRunner runner = DataflowRunner.fromOptions(options); + runner.replaceTransforms(pipeline); + Job job = translator.translate(pipeline, runner, Collections.emptyList()).getJob(); + assertAllStepOutputsHaveUniqueIds(job); + + List<Step> steps = job.getSteps(); + assertEquals(10, steps.size()); + + @SuppressWarnings("unchecked") + List<Map<String, Object>> ctsOutputs = + (List<Map<String, Object>>) + steps.get(steps.size() - 1).getProperties().get(PropertyNames.OUTPUT_INFO); + assertTrue(Structs.getBoolean(Iterables.getOnlyElement(ctsOutputs), "use_indexed_format")); + Step collectionToSingletonStep = steps.get(steps.size() - 1); + assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind()); + } + + @Test public void testStepDisplayData() throws Exception { DataflowPipelineOptions options = buildPipelineOptions(); DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);