[ 
https://issues.apache.org/jira/browse/BEAM-5254?focusedWorklogId=152737&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-152737
 ]

ASF GitHub Bot logged work on BEAM-5254:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Oct/18 16:00
            Start Date: 09/Oct/18 16:00
    Worklog Time Spent: 10m 
      Work Description: akedin closed pull request #6292: [BEAM-5254] Add Samza 
Runner translator registrar and refactor config
URL: https://github.com/apache/beam/pull/6292
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java
index 6e67e385756..bba10ddd962 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java
@@ -33,7 +33,6 @@
 import org.apache.beam.sdk.values.PValue;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
-import org.apache.samza.config.MapConfig;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.operators.ContextManager;
 import org.apache.samza.operators.StreamGraph;
@@ -76,20 +75,18 @@ public SamzaPipelineResult run(Pipeline pipeline) {
 
     // Add a dummy source for use in special cases (TestStream, empty flatten)
     final PValue dummySource = pipeline.apply("Dummy Input Source", 
Create.of("dummy"));
-
     final Map<PValue, String> idMap = PViewToIdMapper.buildIdMap(pipeline);
-    final Map<String, String> config = ConfigBuilder.buildConfig(pipeline, 
options, idMap);
 
-    final SamzaExecutionContext executionContext = new SamzaExecutionContext();
+    final ConfigBuilder configBuilder = new ConfigBuilder(options);
+    SamzaPipelineTranslator.createConfig(pipeline, idMap, configBuilder);
+    final ApplicationRunner runner = 
ApplicationRunner.fromConfig(configBuilder.build());
 
-    final ApplicationRunner runner = ApplicationRunner.fromConfig(new 
MapConfig(config));
+    final SamzaExecutionContext executionContext = new SamzaExecutionContext();
 
     final StreamApplication app =
         new StreamApplication() {
           @Override
           public void init(StreamGraph streamGraph, Config config) {
-            // TODO: we should probably not be creating the execution context 
this early since it needs
-            // to be shipped off to various tasks.
             streamGraph.withContextManager(
                 new ContextManager() {
                   @Override
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/BoundedSourceSystem.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/BoundedSourceSystem.java
index 1bef011a34f..f653cfc934b 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/BoundedSourceSystem.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/BoundedSourceSystem.java
@@ -69,28 +69,6 @@
  */
 // TODO: instrumentation for the consumer
 public class BoundedSourceSystem {
-  /**
-   * Returns the configuration required to instantiate a consumer for the 
given {@link
-   * BoundedSource}.
-   *
-   * @param id a unique id for the source. Must use only valid characters for 
a system name in
-   *     Samza.
-   * @param source the source
-   * @param coder a coder to deserialize messages received by the source's 
consumer
-   * @param <T> the type of object produced by the source consumer
-   */
-  public static <T> Map<String, String> createConfigFor(
-      String id, BoundedSource<T> source, Coder<WindowedValue<T>> coder, 
String stepName) {
-    final Map<String, String> config = new HashMap<>();
-    final String streamPrefix = "systems." + id;
-    config.put(streamPrefix + ".samza.factory", 
BoundedSourceSystem.Factory.class.getName());
-    config.put(streamPrefix + ".source", 
Base64Serializer.serializeUnchecked(source));
-    config.put(streamPrefix + ".coder", 
Base64Serializer.serializeUnchecked(coder));
-    config.put(streamPrefix + ".stepName", stepName);
-    config.put("streams." + id + ".samza.system", id);
-    config.put("streams." + id + ".samza.bounded", "true");
-    return config;
-  }
 
   private static <T> List<BoundedSource<T>> split(
       BoundedSource<T> source, SamzaPipelineOptions pipelineOptions) throws 
Exception {
@@ -414,8 +392,7 @@ private void enqueueUninterruptibly(IncomingMessageEnvelope 
envelope) {
 
   /**
    * A {@link SystemFactory} that produces a {@link BoundedSourceSystem} for a 
particular {@link
-   * BoundedSource} registered in {@link Config} via {@link 
#createConfigFor(String, BoundedSource,
-   * Coder, String)}.
+   * BoundedSource} registered in {@link Config}.
    */
   public static class Factory<T> implements SystemFactory {
     @Override
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java
index 7c819af8413..1364df1317d 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java
@@ -77,28 +77,6 @@
   private static final IncomingMessageEnvelope CHECK_LAST_EXCEPTION_ENVELOPE =
       new IncomingMessageEnvelope(null, null, null, null);
 
-  /**
-   * Returns the configuration required to instantiate a consumer for the 
given {@link
-   * UnboundedSource}.
-   *
-   * @param id a unique id for the source. Must use only valid characters for 
a system name in
-   *     Samza.
-   * @param source the source
-   * @param coder a coder to deserialize messages received by the source's 
consumer
-   * @param <T> the type of object produced by the source consumer
-   */
-  public static <T> Map<String, String> createConfigFor(
-      String id, UnboundedSource<T, ?> source, Coder<WindowedValue<T>> coder, 
String stepName) {
-    final Map<String, String> config = new HashMap<>();
-    final String streamPrefix = "systems." + id;
-    config.put(streamPrefix + ".samza.factory", 
UnboundedSourceSystem.Factory.class.getName());
-    config.put(streamPrefix + ".source", 
Base64Serializer.serializeUnchecked(source));
-    config.put(streamPrefix + ".coder", 
Base64Serializer.serializeUnchecked(coder));
-    config.put(streamPrefix + ".stepName", stepName);
-    config.put("streams." + id + ".samza.system", id);
-    return config;
-  }
-
   /**
    * For better parallelism in Samza, we need to configure a large split 
number for {@link
    * UnboundedSource} like Kafka. This will most likely make each split 
contain a single partition,
@@ -456,8 +434,7 @@ private String getOffset(UnboundedReader reader) {
 
   /**
    * A {@link SystemFactory} that produces a {@link UnboundedSourceSystem} for 
a particular {@link
-   * UnboundedSource} registered in {@link Config} via {@link 
#createConfigFor(String,
-   * UnboundedSource, Coder, String)}.
+   * UnboundedSource} registered in {@link Config}.
    */
   public static class Factory<T, CheckpointMarkT extends CheckpointMark> 
implements SystemFactory {
     @Override
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java
index f01e97ca32e..a74d1ef35cb 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java
@@ -20,11 +20,8 @@
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
 import java.io.File;
 import java.net.URI;
 import java.util.HashMap;
@@ -32,22 +29,7 @@
 import java.util.UUID;
 import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.samza.SamzaPipelineOptions;
-import org.apache.beam.runners.samza.adapter.BoundedSourceSystem;
-import org.apache.beam.runners.samza.adapter.UnboundedSourceSystem;
 import org.apache.beam.runners.samza.util.Base64Serializer;
-import org.apache.beam.runners.samza.util.SamzaCoders;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.runners.TransformHierarchy;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PValue;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
@@ -63,24 +45,27 @@
 import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
 
 /** Builder class to generate configs for BEAM samza runner during runtime. */
-public class ConfigBuilder extends Pipeline.PipelineVisitor.Defaults {
+public class ConfigBuilder {
   private static final String APP_RUNNER_CLASS = "app.runner.class";
 
-  private final ObjectMapper objectMapper = new ObjectMapper();
-  private final Map<PValue, String> idMap;
   private final Map<String, String> config = new HashMap<>();
-  private final Pipeline pipeline;
-  private boolean foundSource = false;
+  private final SamzaPipelineOptions options;
 
-  public static Config buildConfig(
-      Pipeline pipeline, SamzaPipelineOptions options, Map<PValue, String> 
idMap) {
-    try {
-      final ConfigBuilder builder = new ConfigBuilder(idMap, pipeline);
-      pipeline.traverseTopologically(builder);
-      builder.checkFoundSource();
-      final Map<String, String> config = new HashMap<>(builder.getConfig());
+  public ConfigBuilder(SamzaPipelineOptions options) {
+    this.options = options;
+  }
+
+  public void put(String name, String property) {
+    config.put(name, property);
+  }
+
+  public void putAll(Map<String, String> properties) {
+    config.putAll(properties);
+  }
 
-      createConfigForSystemStore(config);
+  public Config build() {
+    try {
+      config.putAll(systemStoreConfig());
 
       // apply user configs
       config.putAll(createUserConfig(options));
@@ -146,97 +131,14 @@ public static Config buildConfig(
         .build();
   }
 
-  private static void createConfigForSystemStore(Map<String, String> config) {
-    config.put(
-        "stores.beamStore.factory",
-        "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory");
-    config.put("stores.beamStore.key.serde", "byteSerde");
-    config.put("stores.beamStore.msg.serde", "byteSerde");
-    config.put("serializers.registry.byteSerde.class", 
ByteSerdeFactory.class.getName());
-  }
-
-  private ConfigBuilder(Map<PValue, String> idMap, Pipeline pipeline) {
-    this.idMap = idMap;
-    this.pipeline = pipeline;
-  }
-
-  @Override
-  public void visitPrimitiveTransform(TransformHierarchy.Node node) {
-    if (node.getTransform() instanceof Read.Bounded) {
-      foundSource = true;
-      processReadBounded(node, (Read.Bounded<?>) node.getTransform());
-    } else if (node.getTransform() instanceof Read.Unbounded) {
-      foundSource = true;
-      processReadUnbounded(node, (Read.Unbounded<?>) node.getTransform());
-    } else if (node.getTransform() instanceof ParDo.MultiOutput) {
-      processParDo((ParDo.MultiOutput<?, ?>) node.getTransform());
-    }
-  }
-
-  private <T> void processReadBounded(TransformHierarchy.Node node, 
Read.Bounded<T> transform) {
-    final String id = 
getId(Iterables.getOnlyElement(node.getOutputs().values()));
-    final BoundedSource<T> source = transform.getSource();
-
-    @SuppressWarnings("unchecked")
-    final PCollection<T> output =
-        (PCollection<T>)
-            
Iterables.getOnlyElement(node.toAppliedPTransform(pipeline).getOutputs().values());
-    final Coder<WindowedValue<T>> coder = SamzaCoders.of(output);
-
-    config.putAll(BoundedSourceSystem.createConfigFor(id, source, coder, 
node.getFullName()));
-  }
-
-  private <T> void processReadUnbounded(TransformHierarchy.Node node, 
Read.Unbounded<T> transform) {
-    final String id = 
getId(Iterables.getOnlyElement(node.getOutputs().values()));
-    final UnboundedSource<T, ?> source = transform.getSource();
-
-    @SuppressWarnings("unchecked")
-    final PCollection<T> output =
-        (PCollection<T>)
-            
Iterables.getOnlyElement(node.toAppliedPTransform(pipeline).getOutputs().values());
-    final Coder<WindowedValue<T>> coder = SamzaCoders.of(output);
-
-    config.putAll(UnboundedSourceSystem.createConfigFor(id, source, coder, 
node.getFullName()));
-  }
-
-  private void processParDo(ParDo.MultiOutput<?, ?> parDo) {
-    final DoFnSignature signature = 
DoFnSignatures.getSignature(parDo.getFn().getClass());
-    if (signature.usesState()) {
-      // set up user state configs
-      for (DoFnSignature.StateDeclaration state : 
signature.stateDeclarations().values()) {
-        String storeId = state.id();
-        config.put(
-            "stores." + storeId + ".factory",
-            "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory");
-        config.put("stores." + storeId + ".key.serde", "byteSerde");
-        config.put("stores." + storeId + ".msg.serde", "byteSerde");
-      }
-    }
-  }
-
-  private String getId(PValue pvalue) {
-    final String id = idMap.get(pvalue);
-    if (id == null) {
-      throw new IllegalStateException(String.format("Could not find id for 
pvalue: %s", pvalue));
-    }
-    return id;
-  }
-
-  private void checkFoundSource() {
-    if (!foundSource) {
-      throw new IllegalStateException("Could not find any sources in 
pipeline!");
-    }
-  }
-
-  private Map<String, String> getConfig() {
-    return config;
-  }
-
-  private String writeValueAsJsonString(Object object) {
-    try {
-      return objectMapper.writeValueAsString(object);
-    } catch (JsonProcessingException e) {
-      throw new RuntimeException(e);
-    }
+  private static Map<String, String> systemStoreConfig() {
+    return ImmutableMap.<String, String>builder()
+        .put(
+            "stores.beamStore.factory",
+            "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory")
+        .put("stores.beamStore.key.serde", "byteSerde")
+        .put("stores.beamStore.msg.serde", "byteSerde")
+        .put("serializers.registry.byteSerde.class", 
ByteSerdeFactory.class.getName())
+        .build();
   }
 }
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java
new file mode 100644
index 00000000000..17e937d94f4
--- /dev/null
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.samza.translation;
+
+import com.google.common.collect.Iterables;
+import java.util.Map;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PValue;
+
+/** Helper that provides context data such as output for config generation. */
+public class ConfigContext {
+  private final Map<PValue, String> idMap;
+  private AppliedPTransform<?, ?, ?> currentTransform;
+
+  public ConfigContext(Map<PValue, String> idMap) {
+    this.idMap = idMap;
+  }
+
+  public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) 
{
+    this.currentTransform = currentTransform;
+  }
+
+  public void clearCurrentTransform() {
+    this.currentTransform = null;
+  }
+
+  @SuppressWarnings("unchecked")
+  public <OutT extends PValue> OutT getOutput(PTransform<?, OutT> transform) {
+    return (OutT) 
Iterables.getOnlyElement(this.currentTransform.getOutputs().values());
+  }
+
+  public String getOutputId(TransformHierarchy.Node node) {
+    return 
getIdForPValue(Iterables.getOnlyElement(node.getOutputs().values()));
+  }
+
+  private String getIdForPValue(PValue pvalue) {
+    final String id = idMap.get(pvalue);
+    if (id == null) {
+      throw new IllegalArgumentException("No id mapping for value: " + pvalue);
+    }
+    return id;
+  }
+}
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
index b6b03331a8a..c3f819e1ca7 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
@@ -50,7 +50,8 @@
 
 /** Translates {@link org.apache.beam.sdk.transforms.ParDo.MultiOutput} to 
Samza {@link DoFnOp}. */
 class ParDoBoundMultiTranslator<InT, OutT>
-    implements TransformTranslator<ParDo.MultiOutput<InT, OutT>> {
+    implements TransformTranslator<ParDo.MultiOutput<InT, OutT>>,
+        TransformConfigGenerator<ParDo.MultiOutput<InT, OutT>> {
 
   @Override
   public void translate(
@@ -140,6 +141,25 @@ public void translate(
     }
   }
 
+  @Override
+  public Map<String, String> createConfig(
+      ParDo.MultiOutput<InT, OutT> transform, TransformHierarchy.Node node, 
ConfigContext ctx) {
+    final Map<String, String> config = new HashMap<>();
+    final DoFnSignature signature = 
DoFnSignatures.getSignature(transform.getFn().getClass());
+    if (signature.usesState()) {
+      // set up user state configs
+      for (DoFnSignature.StateDeclaration state : 
signature.stateDeclarations().values()) {
+        final String storeId = state.id();
+        config.put(
+            "stores." + storeId + ".factory",
+            "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory");
+        config.put("stores." + storeId + ".key.serde", "byteSerde");
+        config.put("stores." + storeId + ".msg.serde", "byteSerde");
+      }
+    }
+    return config;
+  }
+
   private <T> void registerSideOutputStream(
       MessageStream<OpMessage<RawUnionValue>> inputStream,
       PValue outputPValue,
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ReadTranslator.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ReadTranslator.java
index 894d15ebdb3..085a975512d 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ReadTranslator.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ReadTranslator.java
@@ -18,8 +18,19 @@
 
 package org.apache.beam.runners.samza.translation;
 
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.runners.samza.adapter.BoundedSourceSystem;
+import org.apache.beam.runners.samza.adapter.UnboundedSourceSystem;
+import org.apache.beam.runners.samza.util.Base64Serializer;
+import org.apache.beam.runners.samza.util.SamzaCoders;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 
@@ -27,7 +38,10 @@
  * Translates {@link org.apache.beam.sdk.io.Read} to Samza input {@link
  * org.apache.samza.operators.MessageStream}.
  */
-public class ReadTranslator<T> implements 
TransformTranslator<PTransform<PBegin, PCollection<T>>> {
+public class ReadTranslator<T>
+    implements TransformTranslator<PTransform<PBegin, PCollection<T>>>,
+        TransformConfigGenerator<PTransform<PBegin, PCollection<T>>> {
+
   @Override
   public void translate(
       PTransform<PBegin, PCollection<T>> transform,
@@ -36,4 +50,35 @@ public void translate(
     final PCollection<T> output = ctx.getOutput(transform);
     ctx.registerInputMessageStream(output);
   }
+
+  @Override
+  public Map<String, String> createConfig(
+      PTransform<PBegin, PCollection<T>> transform,
+      TransformHierarchy.Node node,
+      ConfigContext ctx) {
+    final String id = ctx.getOutputId(node);
+    final PCollection<T> output = ctx.getOutput(transform);
+    final Coder<WindowedValue<T>> coder = SamzaCoders.of(output);
+    final Source<?> source =
+        transform instanceof Read.Unbounded
+            ? ((Read.Unbounded) transform).getSource()
+            : ((Read.Bounded) transform).getSource();
+
+    final Map<String, String> config = new HashMap<>();
+    final String streamPrefix = "systems." + id;
+
+    config.put(streamPrefix + ".source", 
Base64Serializer.serializeUnchecked(source));
+    config.put(streamPrefix + ".coder", 
Base64Serializer.serializeUnchecked(coder));
+    config.put(streamPrefix + ".stepName", node.getFullName());
+    config.put("streams." + id + ".samza.system", id);
+
+    if (source instanceof BoundedSource) {
+      config.put("streams." + id + ".samza.bounded", "true");
+      config.put(streamPrefix + ".samza.factory", 
BoundedSourceSystem.Factory.class.getName());
+    } else {
+      config.put(streamPrefix + ".samza.factory", 
UnboundedSourceSystem.Factory.class.getName());
+    }
+
+    return config;
+  }
 }
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java
index 615dc1e5fae..d8369ea2614 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java
@@ -22,7 +22,9 @@
 
 import com.google.auto.service.AutoService;
 import com.google.common.collect.ImmutableMap;
+import java.util.HashMap;
 import java.util.Map;
+import java.util.ServiceLoader;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
 import 
org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar;
 import org.apache.beam.runners.samza.SamzaPipelineOptions;
@@ -39,16 +41,15 @@
 public class SamzaPipelineTranslator {
   private static final Logger LOG = 
LoggerFactory.getLogger(SamzaPipelineTranslator.class);
 
-  private static final Map<String, TransformTranslator<?>> TRANSLATORS =
-      ImmutableMap.<String, TransformTranslator<?>>builder()
-          .put(PTransformTranslation.READ_TRANSFORM_URN, new ReadTranslator())
-          .put(PTransformTranslation.PAR_DO_TRANSFORM_URN, new 
ParDoBoundMultiTranslator())
-          .put(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, new 
GroupByKeyTranslator())
-          .put(PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN, new 
GroupByKeyTranslator())
-          .put(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, new 
WindowAssignTranslator())
-          .put(PTransformTranslation.FLATTEN_TRANSFORM_URN, new 
FlattenPCollectionsTranslator())
-          .put(SamzaPublishView.SAMZA_PUBLISH_VIEW_URN, new 
SamzaPublishViewTranslator())
-          .build();
+  private static final Map<String, TransformTranslator<?>> TRANSLATORS = 
loadTranslators();
+
+  private static Map<String, TransformTranslator<?>> loadTranslators() {
+    Map<String, TransformTranslator<?>> translators = new HashMap<>();
+    for (SamzaTranslatorRegistrar registrar : 
ServiceLoader.load(SamzaTranslatorRegistrar.class)) {
+      translators.putAll(registrar.getTransformTranslators());
+    }
+    return ImmutableMap.copyOf(translators);
+  }
 
   private SamzaPipelineTranslator() {}
 
@@ -60,16 +61,70 @@ public static void translate(
       PValue dummySource) {
 
     final TranslationContext ctx = new TranslationContext(graph, idMap, 
options, dummySource);
-    final TranslationVisitor visitor = new TranslationVisitor(ctx);
+
+    final TransformVisitorFn translateFn =
+        new TransformVisitorFn() {
+          private int topologicalId = 0;
+
+          @Override
+          public <T extends PTransform<?, ?>> void apply(
+              T transform,
+              TransformHierarchy.Node node,
+              Pipeline pipeline,
+              TransformTranslator<T> translator) {
+            ctx.setCurrentTransform(node.toAppliedPTransform(pipeline));
+            ctx.setCurrentTopologicalId(topologicalId++);
+
+            translator.translate(transform, node, ctx);
+
+            ctx.clearCurrentTransform();
+          }
+        };
+    final SamzaPipelineVisitor visitor = new SamzaPipelineVisitor(translateFn);
     pipeline.traverseTopologically(visitor);
   }
 
-  private static class TranslationVisitor extends 
Pipeline.PipelineVisitor.Defaults {
-    private final TranslationContext ctx;
-    private int topologicalId = 0;
+  public static void createConfig(
+      Pipeline pipeline, Map<PValue, String> idMap, ConfigBuilder 
configBuilder) {
+    final ConfigContext ctx = new ConfigContext(idMap);
+
+    final TransformVisitorFn configFn =
+        new TransformVisitorFn() {
+          @Override
+          public <T extends PTransform<?, ?>> void apply(
+              T transform,
+              TransformHierarchy.Node node,
+              Pipeline pipeline,
+              TransformTranslator<T> translator) {
+
+            ctx.setCurrentTransform(node.toAppliedPTransform(pipeline));
+
+            if (translator instanceof TransformConfigGenerator) {
+              TransformConfigGenerator<T> configGenerator =
+                  (TransformConfigGenerator<T>) translator;
+              configBuilder.putAll(configGenerator.createConfig(transform, 
node, ctx));
+            }
+
+            ctx.clearCurrentTransform();
+          }
+        };
+    final SamzaPipelineVisitor visitor = new SamzaPipelineVisitor(configFn);
+    pipeline.traverseTopologically(visitor);
+  }
+
+  private interface TransformVisitorFn {
+    <T extends PTransform<?, ?>> void apply(
+        T transform,
+        TransformHierarchy.Node node,
+        Pipeline pipeline,
+        TransformTranslator<T> translator);
+  }
+
+  private static class SamzaPipelineVisitor extends 
Pipeline.PipelineVisitor.Defaults {
+    private TransformVisitorFn visitorFn;
 
-    private TranslationVisitor(TranslationContext ctx) {
-      this.ctx = ctx;
+    private SamzaPipelineVisitor(TransformVisitorFn visitorFn) {
+      this.visitorFn = visitorFn;
     }
 
     @Override
@@ -97,14 +152,9 @@ public void visitPrimitiveTransform(TransformHierarchy.Node 
node) {
     private <T extends PTransform<?, ?>> void applyTransform(
         T transform, TransformHierarchy.Node node, TransformTranslator<?> 
translator) {
 
-      ctx.setCurrentTransform(node.toAppliedPTransform(getPipeline()));
-      ctx.setCurrentTopologicalId(topologicalId++);
-
       @SuppressWarnings("unchecked")
       final TransformTranslator<T> typedTranslator = (TransformTranslator<T>) 
translator;
-      typedTranslator.translate(transform, node, ctx);
-
-      ctx.clearCurrentTransform();
+      visitorFn.apply(transform, node, getPipeline(), typedTranslator);
     }
 
     private static boolean canTranslate(String urn, PTransform<?, ?> 
transform) {
@@ -123,6 +173,24 @@ private static String getUrnForTransform(PTransform<?, ?> 
transform) {
     }
   }
 
+  /** Registers Samza translators. */
+  @AutoService(SamzaTranslatorRegistrar.class)
+  public static class SamzaTranslators implements SamzaTranslatorRegistrar {
+
+    @Override
+    public Map<String, TransformTranslator<?>> getTransformTranslators() {
+      return ImmutableMap.<String, TransformTranslator<?>>builder()
+          .put(PTransformTranslation.READ_TRANSFORM_URN, new ReadTranslator())
+          .put(PTransformTranslation.PAR_DO_TRANSFORM_URN, new 
ParDoBoundMultiTranslator())
+          .put(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, new 
GroupByKeyTranslator())
+          .put(PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN, new 
GroupByKeyTranslator())
+          .put(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, new 
WindowAssignTranslator())
+          .put(PTransformTranslation.FLATTEN_TRANSFORM_URN, new 
FlattenPCollectionsTranslator())
+          .put(SamzaPublishView.SAMZA_PUBLISH_VIEW_URN, new 
SamzaPublishViewTranslator())
+          .build();
+    }
+  }
+
   /** Registers classes specialized to the Samza runner. */
   @AutoService(TransformPayloadTranslatorRegistrar.class)
   public static class SamzaTransformsRegistrar implements 
TransformPayloadTranslatorRegistrar {
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaTranslatorRegistrar.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaTranslatorRegistrar.java
new file mode 100644
index 00000000000..164ee774957
--- /dev/null
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaTranslatorRegistrar.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.samza.translation;
+
+import java.util.Map;
+
+/** A registrar of TransformTranslator. */
+public interface SamzaTranslatorRegistrar {
+  Map<String, TransformTranslator<?>> getTransformTranslators();
+}
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TransformConfigGenerator.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TransformConfigGenerator.java
new file mode 100644
index 00000000000..485ab415fe3
--- /dev/null
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TransformConfigGenerator.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.samza.translation;
+
+import java.util.Map;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.PTransform;
+
+/** Generates config for a BEAM {@link PTransform}. */
+public interface TransformConfigGenerator<T extends PTransform<?, ?>> {
+  Map<String, String> createConfig(T transform, TransformHierarchy.Node node, 
ConfigContext ctx);
+}
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TransformTranslator.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TransformTranslator.java
index f2e444185f6..fb1f8c59900 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TransformTranslator.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TransformTranslator.java
@@ -22,6 +22,6 @@
 import org.apache.beam.sdk.transforms.PTransform;
 
 /** Interface of Samza translator for BEAM {@link PTransform}. */
-interface TransformTranslator<T extends PTransform<?, ?>> {
+public interface TransformTranslator<T extends PTransform<?, ?>> {
   void translate(T transform, TransformHierarchy.Node node, TranslationContext 
ctx);
 }
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java
index 5e5df087346..c77c2f50a43 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java
@@ -38,7 +38,7 @@
  * MessageStream}. It also provides other context data such as input and 
output of a {@link
  * PTransform}.
  */
-class TranslationContext {
+public class TranslationContext {
   private final StreamGraph streamGraph;
   private final Map<PValue, MessageStream<?>> messsageStreams = new 
HashMap<>();
   private final Map<PCollectionView<?>, MessageStream<?>> viewStreams = new 
HashMap<>();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 152737)
    Time Spent: 1h 20m  (was: 1h 10m)

> Add Samza Runner translator registrar and refactor config generation
> --------------------------------------------------------------------
>
>                 Key: BEAM-5254
>                 URL: https://issues.apache.org/jira/browse/BEAM-5254
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-samza
>            Reporter: Xinyu Liu
>            Assignee: Xinyu Liu
>            Priority: Major
>          Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> Add a registrar for transform translators in Samza Runner so we allow 
> customized translators. Also refactors the config generation part so it can 
> be extended outside open source beam.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to