[ 
https://issues.apache.org/jira/browse/BEAM-5859?focusedWorklogId=171679&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-171679
 ]

ASF GitHub Bot logged work on BEAM-5859:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 03/Dec/18 19:10
            Start Date: 03/Dec/18 19:10
    Worklog Time Spent: 10m 
      Work Description: tweise closed pull request #7150: [BEAM-5859] Improve 
operator names for portable pipelines
URL: https://github.com/apache/beam/pull/7150
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExecutableStageTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExecutableStageTranslation.java
index c9bb4f6ea030..09d1a991c279 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExecutableStageTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExecutableStageTranslation.java
@@ -39,4 +39,25 @@ public static ExecutableStagePayload 
getExecutableStagePayload(
     checkArgument(ExecutableStage.URN.equals(transform.getSpec().getUrn()));
     return ExecutableStagePayload.parseFrom(transform.getSpec().getPayload());
   }
+
+  public static String generateNameFromStagePayload(ExecutableStagePayload 
stagePayload) {
+    StringBuilder sb = new StringBuilder();
+    RunnerApi.Components components = stagePayload.getComponents();
+    final int transformsCount = stagePayload.getTransformsCount();
+    sb.append("[").append(transformsCount).append("]");
+    sb.append("{");
+    for (int i = 0; i < transformsCount; i++) {
+      String name = 
components.getTransformsOrThrow(stagePayload.getTransforms(i)).getUniqueName();
+      // Python: Remove the 'ref_AppliedPTransform_' prefix which just makes 
the name longer
+      name = name.replaceFirst("^ref_AppliedPTransform_", "");
+      // Java: Remove the 'ParMultiDo(Anonymous)' suffix which just makes the 
name longer
+      name = name.replaceFirst("/ParMultiDo\\(Anonymous\\)$", "");
+      sb.append(name);
+      if (i + 1 < transformsCount) {
+        sb.append(", ");
+      }
+    }
+    sb.append("}");
+    return sb.toString();
+  }
 }
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ExecutableStageTranslationTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ExecutableStageTranslationTest.java
new file mode 100644
index 000000000000..c5b06f76db04
--- /dev/null
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ExecutableStageTranslationTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+import java.io.Serializable;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.junit.Test;
+
+/** Tests for {@link ExecutableStageTranslation}. */
+public class ExecutableStageTranslationTest implements Serializable {
+
+  @Test
+  /* Test for generating readable operator names during translation. */
+  public void testOperatorNameGeneration() throws Exception {
+    Pipeline p = Pipeline.create();
+    p.apply(Impulse.create())
+        // Anonymous ParDo
+        .apply(
+            ParDo.of(
+                new DoFn<byte[], String>() {
+                  @ProcessElement
+                  public void processElement(
+                      ProcessContext processContext, OutputReceiver<String> 
outputReceiver) {}
+                }))
+        // Name ParDo
+        .apply(
+            "MyName",
+            ParDo.of(
+                new DoFn<String, Integer>() {
+                  @ProcessElement
+                  public void processElement(
+                      ProcessContext processContext, OutputReceiver<Integer> 
outputReceiver) {}
+                }))
+        .apply(
+            // This is how Python pipelines construct names
+            "ref_AppliedPTransform_count",
+            ParDo.of(
+                new DoFn<Integer, Integer>() {
+                  @ProcessElement
+                  public void processElement(
+                      ProcessContext processContext, OutputReceiver<Integer> 
outputReceiver) {}
+                }));
+
+    ExecutableStage firstEnvStage =
+        GreedyPipelineFuser.fuse(PipelineTranslation.toProto(p))
+            .getFusedStages()
+            .stream()
+            .findFirst()
+            .get();
+    RunnerApi.ExecutableStagePayload basePayload =
+        RunnerApi.ExecutableStagePayload.parseFrom(
+            firstEnvStage.toPTransform("foo").getSpec().getPayload());
+
+    String executableStageName =
+        ExecutableStageTranslation.generateNameFromStagePayload(basePayload);
+
+    assertThat(executableStageName, is("[3]{ParDo(Anonymous), MyName, 
count}"));
+  }
+}
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
index bc1ffd36b4c2..713d09feec68 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.flink;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.runners.core.construction.ExecutableStageTranslation.generateNameFromStagePayload;
 import static 
org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils.createOutputMap;
 import static 
org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils.getWindowingStrategy;
 import static 
org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils.instantiateCoder;
@@ -243,7 +244,7 @@ public void translate(BatchTranslationContext context, 
RunnerApi.Pipeline pipeli
 
     // Ensure that side effects are performed for unconsumed DataSets.
     for (DataSet<?> dataSet : context.getDanglingDataSets()) {
-      dataSet.output(new DiscardingOutputFormat<>());
+      dataSet.output(new DiscardingOutputFormat<>()).name("DiscardingOutput");
     }
   }
 
@@ -354,6 +355,8 @@ public void translate(BatchTranslationContext context, 
RunnerApi.Pipeline pipeli
             FlinkExecutableStageContext.factory(context.getPipelineOptions()),
             getWindowingStrategy(inputPCollectionId, 
components).getWindowFn().windowCoder());
 
+    final String operatorName = generateNameFromStagePayload(stagePayload);
+
     final SingleInputUdfOperator taggedDataset;
     if (stagePayload.getUserStatesCount() > 0 || stagePayload.getTimersCount() 
> 0) {
 
@@ -373,12 +376,10 @@ public void translate(BatchTranslationContext context, 
RunnerApi.Pipeline pipeli
       Grouping<WindowedValue<InputT>> groupedInput =
           inputDataSet.groupBy(new KvKeySelector<>(keyCoder));
       taggedDataset =
-          new GroupReduceOperator<>(
-              groupedInput, typeInformation, function, 
transform.getTransform().getUniqueName());
+          new GroupReduceOperator<>(groupedInput, typeInformation, function, 
operatorName);
     } else {
       taggedDataset =
-          new MapPartitionOperator<>(
-              inputDataSet, typeInformation, function, 
transform.getTransform().getUniqueName());
+          new MapPartitionOperator<>(inputDataSet, typeInformation, function, 
operatorName);
     }
 
     for (SideInputId sideInputId : stagePayload.getSideInputsList()) {
@@ -406,7 +407,7 @@ public void translate(BatchTranslationContext context, 
RunnerApi.Pipeline pipeli
       // no-op sink to each to make sure they are materialized by Flink. 
However, some SDK-executed
       // stages have no runner-visible output after fusion. We handle this 
case by adding a sink
       // here.
-      taggedDataset.output(new DiscardingOutputFormat<>());
+      taggedDataset.output(new 
DiscardingOutputFormat<>()).name("DiscardingOutput");
     }
   }
 
@@ -547,10 +548,11 @@ private static void translateImpulse(
             WindowedValue.getFullCoder(ByteArrayCoder.of(), 
GlobalWindow.Coder.INSTANCE));
     DataSource<WindowedValue<byte[]>> dataSource =
         new DataSource<>(
-            context.getExecutionEnvironment(),
-            new ImpulseInputFormat(),
-            typeInformation,
-            transform.getTransform().getUniqueName());
+                context.getExecutionEnvironment(),
+                new ImpulseInputFormat(),
+                typeInformation,
+                transform.getTransform().getUniqueName())
+            .name("Impulse");
 
     context.addDataSet(
         
Iterables.getOnlyElement(transform.getTransform().getOutputsMap().values()), 
dataSource);
@@ -626,7 +628,7 @@ private static void pruneOutput(
             taggedDataset,
             outputType,
             pruningFunction,
-            String.format("%s/out.%d", transformName, unionTag));
+            String.format("ExtractOutput[%s]", unionTag));
     context.addDataSet(collectionId, pruningOperator);
   }
 }
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
index 83b8908a11f3..7fa2eb489982 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.flink;
 
+import static 
org.apache.beam.runners.core.construction.ExecutableStageTranslation.generateNameFromStagePayload;
 import static 
org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils.getWindowingStrategy;
 import static 
org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils.instantiateCoder;
 
@@ -414,7 +415,7 @@ private void translateImpulse(
     SingleOutputStreamOperator<WindowedValue<byte[]>> source =
         context
             .getExecutionEnvironment()
-            .addSource(new ImpulseSourceFunction(keepSourceAlive))
+            .addSource(new ImpulseSourceFunction(keepSourceAlive), "Impulse")
             .returns(typeInfo);
 
     
context.addDataStream(Iterables.getOnlyElement(pTransform.getOutputsMap().values()),
 source);
@@ -452,7 +453,9 @@ private void translateStreamingImpulse(
     SingleOutputStreamOperator<WindowedValue<byte[]>> source =
         context
             .getExecutionEnvironment()
-            .addSource(new StreamingImpulseSource(intervalMillis, 
messageCount))
+            .addSource(
+                new StreamingImpulseSource(intervalMillis, messageCount),
+                StreamingImpulseSource.class.getSimpleName())
             .returns(typeInfo);
 
     
context.addDataStream(Iterables.getOnlyElement(pTransform.getOutputsMap().values()),
 source);
@@ -496,8 +499,6 @@ private void translateStreamingImpulse(
 
   private <InputT, OutputT> void translateExecutableStage(
       String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext 
context) {
-    // TODO: Fail on stateful DoFns for now.
-    // TODO: Support stateful DoFns by inserting group-by-keys where necessary.
     // TODO: Fail on splittable DoFns.
     // TODO: Special-case single outputs to avoid multiplexing PCollections.
     RunnerApi.Components components = pipeline.getComponents();
@@ -609,14 +610,15 @@ private void translateStreamingImpulse(
             keyCoder,
             keySelector);
 
+    final String operatorName = generateNameFromStagePayload(stagePayload);
+
     if (transformedSideInputs.unionTagToView.isEmpty()) {
-      outputStream =
-          inputDataStream.transform(transform.getUniqueName(), 
outputTypeInformation, doFnOperator);
+      outputStream = inputDataStream.transform(operatorName, 
outputTypeInformation, doFnOperator);
     } else {
       outputStream =
           inputDataStream
               .connect(transformedSideInputs.unionedSideInputs.broadcast())
-              .transform(transform.getUniqueName(), outputTypeInformation, 
doFnOperator);
+              .transform(operatorName, outputTypeInformation, doFnOperator);
     }
 
     if (mainOutputTag != null) {
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslatorUtilsTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslatorUtilsTest.java
new file mode 100644
index 000000000000..8ed188b04200
--- /dev/null
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslatorUtilsTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableMap;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import 
org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils;
+import org.junit.Test;
+
+/**
+ * Tests for {@link 
org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils}.
+ */
+public class FlinkPipelineTranslatorUtilsTest {
+
+  @Test
+  public void testOutputMapCreation() {
+    List<String> outputs = Arrays.asList("output1", "output2", "output3");
+    BiMap<String, Integer> outputMap = 
FlinkPipelineTranslatorUtils.createOutputMap(outputs);
+    Map<Object, Object> expected =
+        ImmutableMap.builder().put("output1", 0).put("output2", 
1).put("output3", 2).build();
+    assertThat(outputMap, is(expected));
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 171679)
    Time Spent: 2h 10m  (was: 2h)

> Improve Traceability of Pipeline translation
> --------------------------------------------
>
>                 Key: BEAM-5859
>                 URL: https://issues.apache.org/jira/browse/BEAM-5859
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-flink
>            Reporter: Maximilian Michels
>            Priority: Major
>              Labels: portability, portability-flink
>         Attachments: tfx.png, wordcount.png
>
>          Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> Users often ask how they can reason about the pipeline translation. The Flink 
> UI display a confusingly large graph without any trace of the original Beam 
> pipeline:
> WordCount:
>  !wordcount.png! 
> TFX:
>  !tfx.png! 
> Some aspects which make understanding these graphs hard:
>  * Users don't know how the Runner maps Beam to Flink concepts
>  * The UI is awfully slow / hangs when the pipeline is reasonable complex
>  * The operator names seem to use {{transform.getUniqueName()}} which doesn't 
> generate readable name
>  * So called Chaining combines operators into a single operator which makes 
> understanding which Beam concept belongs to which Flink concept even harder
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to