Repository: incubator-beam
Updated Branches:
  refs/heads/master 217c29bfc -> cc28f0cb4


Add PTransformOverrideFactory to the Core SDK

This migrates PTransformOverrideFactory from the DirectRunner to the
Core SDK, as part of BEAM-646.

Add getOriginalToReplacements to provide a mapping from the original
outputs to replaced outputs. This enables all replaced nodes to be
rewired to output the original output.

Migrate all DirectRunner Override Factories to the new
PTransformOverrideFactory.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3f227a0a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3f227a0a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3f227a0a

Branch: refs/heads/master
Commit: 3f227a0ad18f425767e89f88d8a1c9fdcebdd80c
Parents: 217c29b
Author: Thomas Groh <tg...@google.com>
Authored: Mon Dec 5 16:01:57 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Dec 16 14:21:49 2016 -0800

----------------------------------------------------------------------
 ...ectGBKIntoKeyedWorkItemsOverrideFactory.java |  7 ++--
 .../direct/DirectGroupByKeyOverrideFactory.java |  3 +-
 .../beam/runners/direct/DirectRunner.java       |  7 +++-
 .../direct/PTransformOverrideFactory.java       | 35 -----------------
 .../direct/ParDoMultiOverrideFactory.java       |  9 +++--
 .../ParDoSingleViaMultiOverrideFactory.java     | 11 +++---
 .../direct/TestStreamEvaluatorFactory.java      |  5 ++-
 .../runners/direct/ViewEvaluatorFactory.java    |  4 +-
 .../direct/WriteWithShardingFactory.java        |  6 ++-
 .../direct/WriteWithShardingFactoryTest.java    |  4 +-
 .../beam/sdk/annotations/Experimental.java      |  5 ++-
 .../sdk/runners/PTransformOverrideFactory.java  | 41 ++++++++++++++++++++
 12 files changed, 80 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f227a0a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
index 1fa059c..ab4c114 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
@@ -18,7 +18,8 @@
 package org.apache.beam.runners.direct;
 
 import org.apache.beam.runners.core.KeyedWorkItem;
-import org.apache.beam.runners.core.SplittableParDo;
+import org.apache.beam.runners.core.SplittableParDo.GBKIntoKeyedWorkItems;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -30,10 +31,10 @@ import org.apache.beam.sdk.values.PCollection;
 class DirectGBKIntoKeyedWorkItemsOverrideFactory<KeyT, InputT>
     implements PTransformOverrideFactory<
         PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, 
InputT>>,
-        SplittableParDo.GBKIntoKeyedWorkItems<KeyT, InputT>> {
+        GBKIntoKeyedWorkItems<KeyT, InputT>> {
   @Override
   public PTransform<PCollection<KV<KeyT, InputT>>, 
PCollection<KeyedWorkItem<KeyT, InputT>>>
-      override(SplittableParDo.GBKIntoKeyedWorkItems<KeyT, InputT> transform) {
+      getReplacementTransform(GBKIntoKeyedWorkItems<KeyT, InputT> transform) {
     return new DirectGroupByKey.DirectGroupByKeyOnly<>();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f227a0a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
index 9acf5e9..7cf3256 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.direct;
 
+import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.KV;
@@ -27,7 +28,7 @@ final class DirectGroupByKeyOverrideFactory<K, V>
     implements PTransformOverrideFactory<
         PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>, GroupByKey<K, 
V>> {
   @Override
-  public PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> 
override(
+  public PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> 
getReplacementTransform(
       GroupByKey<K, V> transform) {
     return new DirectGroupByKey<>(transform);
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f227a0a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 2f84356..78163c0 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -46,6 +46,7 @@ import org.apache.beam.sdk.io.Write;
 import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.transforms.Aggregator;
@@ -284,9 +285,11 @@ public class DirectRunner extends 
PipelineRunner<DirectPipelineResult> {
   @Override
   public <OutputT extends POutput, InputT extends PInput> OutputT apply(
       PTransform<InputT, OutputT> transform, InputT input) {
-    PTransformOverrideFactory overrideFactory = 
defaultTransformOverrides.get(transform.getClass());
+    PTransformOverrideFactory<InputT, OutputT, PTransform<InputT, OutputT>> 
overrideFactory =
+        defaultTransformOverrides.get(transform.getClass());
     if (overrideFactory != null) {
-      PTransform<InputT, OutputT> customTransform = 
overrideFactory.override(transform);
+      PTransform<InputT, OutputT> customTransform =
+          overrideFactory.getReplacementTransform(transform);
       if (customTransform != transform) {
         return Pipeline.applyTransform(transform.getName(), input, 
customTransform);
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f227a0a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PTransformOverrideFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PTransformOverrideFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PTransformOverrideFactory.java
deleted file mode 100644
index 8db6e9b..0000000
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PTransformOverrideFactory.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-
-interface PTransformOverrideFactory<
-    InputT extends PInput,
-    OutputT extends POutput,
-    TransformT extends PTransform<InputT, OutputT>> {
-  /**
-   * Create a {@link PTransform} override for the provided {@link PTransform} 
if applicable.
-   * Otherwise, return the input {@link PTransform}.
-   *
-   * <p>The returned PTransform must be semantically equivalent to the input 
{@link PTransform}.
-   */
-  PTransform<InputT, OutputT> override(TransformT transform);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f227a0a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index 4401434..c5bc069 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -20,10 +20,12 @@ package org.apache.beam.runners.direct;
 import org.apache.beam.runners.core.SplittableParDo;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.ParDo.BoundMulti;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.values.KV;
@@ -39,12 +41,11 @@ import org.apache.beam.sdk.values.TypedPValue;
  */
 class ParDoMultiOverrideFactory<InputT, OutputT>
     implements PTransformOverrideFactory<
-        PCollection<? extends InputT>, PCollectionTuple, 
ParDo.BoundMulti<InputT, OutputT>> {
-
+        PCollection<? extends InputT>, PCollectionTuple, BoundMulti<InputT, 
OutputT>> {
   @Override
   @SuppressWarnings("unchecked")
-  public PTransform<PCollection<? extends InputT>, PCollectionTuple> override(
-      ParDo.BoundMulti<InputT, OutputT> transform) {
+  public PTransform<PCollection<? extends InputT>, PCollectionTuple> 
getReplacementTransform(
+      BoundMulti<InputT, OutputT> transform) {
 
     DoFn<InputT, OutputT> fn = transform.getFn();
     DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f227a0a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
index 5fcf49c..3ae3382 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
@@ -17,8 +17,10 @@
  */
 package org.apache.beam.runners.direct;
 
+import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.ParDo.Bound;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.TupleTag;
@@ -30,12 +32,11 @@ import org.apache.beam.sdk.values.TupleTagList;
  */
 class ParDoSingleViaMultiOverrideFactory<InputT, OutputT>
     implements PTransformOverrideFactory<
-        PCollection<? extends InputT>, PCollection<OutputT>, 
ParDo.Bound<InputT, OutputT>> {
+        PCollection<? extends InputT>, PCollection<OutputT>, Bound<InputT, 
OutputT>>{
   @Override
-  @SuppressWarnings("unchecked")
-  public PTransform<PCollection<? extends InputT>, PCollection<OutputT>> 
override(
-      ParDo.Bound<InputT, OutputT> transform) {
-    return new ParDoSingleViaMulti(transform);
+  public PTransform<PCollection<? extends InputT>, PCollection<OutputT>> 
getReplacementTransform(
+      Bound<InputT, OutputT> transform) {
+    return new ParDoSingleViaMulti<>(transform);
   }
 
   static class ParDoSingleViaMulti<InputT, OutputT>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f227a0a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
index 3601dbc..6ba65bf 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.testing.TestStream.ElementEvent;
@@ -157,8 +158,10 @@ class TestStreamEvaluatorFactory implements 
TransformEvaluatorFactory {
 
   static class DirectTestStreamFactory<T>
       implements PTransformOverrideFactory<PBegin, PCollection<T>, 
TestStream<T>> {
+
     @Override
-    public PTransform<PBegin, PCollection<T>> override(TestStream<T> 
transform) {
+    public PTransform<PBegin, PCollection<T>> getReplacementTransform(
+        TestStream<T> transform) {
       return new DirectTestStream<>(transform);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f227a0a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
index 460b1c2..96a18d7 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
@@ -24,6 +24,7 @@ import 
org.apache.beam.runners.direct.DirectRunner.PCollectionViewWriter;
 import org.apache.beam.runners.direct.StepTransformResult.Builder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -96,8 +97,9 @@ class ViewEvaluatorFactory implements 
TransformEvaluatorFactory {
   public static class ViewOverrideFactory<ElemT, ViewT>
       implements PTransformOverrideFactory<
           PCollection<ElemT>, PCollectionView<ViewT>, 
CreatePCollectionView<ElemT, ViewT>> {
+
     @Override
-    public PTransform<PCollection<ElemT>, PCollectionView<ViewT>> override(
+    public PTransform<PCollection<ElemT>, PCollectionView<ViewT>> 
getReplacementTransform(
         CreatePCollectionView<ElemT, ViewT> transform) {
       return new DirectCreatePCollectionView<>(transform);
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f227a0a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
index 3c88337..fd1c175 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
@@ -47,11 +47,13 @@ import org.joda.time.Duration;
  * of shards is the log base 10 of the number of input records, with up to 2 
additional shards.
  */
 class WriteWithShardingFactory<InputT>
-    implements PTransformOverrideFactory<PCollection<InputT>, PDone, 
Write.Bound<InputT>> {
+    implements org.apache.beam.sdk.runners.PTransformOverrideFactory<
+        PCollection<InputT>, PDone, Write.Bound<InputT>> {
   static final int MAX_RANDOM_EXTRA_SHARDS = 3;
 
   @Override
-  public PTransform<PCollection<InputT>, PDone> override(Write.Bound<InputT> 
transform) {
+  public PTransform<PCollection<InputT>, PDone> getReplacementTransform(
+      Bound<InputT> transform) {
     if (transform.getNumShards() == 0) {
       return new DynamicallyReshardedWrite<>(transform);
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f227a0a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
index 1ff5de2..aeb75ed 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
@@ -121,13 +121,13 @@ public class WriteWithShardingFactoryTest {
   public void withShardingSpecifiesOriginalTransform() {
     Write.Bound<Object> original = Write.to(new TestSink()).withNumShards(3);
 
-    assertThat(factory.override(original), equalTo((Object) original));
+    assertThat(factory.getReplacementTransform(original), equalTo((Object) 
original));
   }
 
   @Test
   public void withNoShardingSpecifiedReturnsNewTransform() {
     Write.Bound<Object> original = Write.to(new TestSink());
-    assertThat(factory.override(original), not(equalTo((Object) original)));
+    assertThat(factory.getReplacementTransform(original), not(equalTo((Object) 
original)));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f227a0a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
index c55cd5e..2659659 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
@@ -85,6 +85,9 @@ public @interface Experimental {
     SPLITTABLE_DO_FN,
 
     /** Metrics-related experimental APIs. */
-    METRICS
+    METRICS,
+
+    /** Experimental runner APIs. Should not be used by pipeline authors. */
+    CORE_RUNNERS_ONLY
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f227a0a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverrideFactory.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverrideFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverrideFactory.java
new file mode 100644
index 0000000..f6e90e2
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverrideFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.beam.sdk.runners;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+/**
+ * Produces {@link PipelineRunner}-specific overrides of {@link PTransform 
PTransforms}, and
+ * provides mappings between original and replacement outputs.
+ */
+@Experimental(Kind.CORE_RUNNERS_ONLY)
+public interface PTransformOverrideFactory<
+    InputT extends PInput,
+    OutputT extends POutput,
+    TransformT extends PTransform<? super InputT, OutputT>> {
+  /**
+   * Returns a {@link PTransform} that produces equivalent output to the 
provided transform.
+   */
+  PTransform<InputT, OutputT> getReplacementTransform(TransformT transform);
+}

Reply via email to