Minor cleanups in ParDoEvaluator

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1cc16b0d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1cc16b0d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1cc16b0d

Branch: refs/heads/DSL_SQL
Commit: 1cc16b0d6cea7b01b01427758eaf427cc29635b6
Parents: 3fd8890
Author: Eugene Kirpichov <kirpic...@google.com>
Authored: Mon Apr 17 12:25:02 2017 -0700
Committer: Eugene Kirpichov <kirpic...@google.com>
Committed: Tue Apr 18 18:02:06 2017 -0700

----------------------------------------------------------------------
 ...oFnLifecycleManagerRemovingTransformEvaluator.java |  6 +++---
 .../apache/beam/runners/direct/ParDoEvaluator.java    | 14 +++++---------
 .../beam/runners/direct/ParDoEvaluatorFactory.java    |  2 +-
 .../SplittableProcessElementsEvaluatorFactory.java    |  2 +-
 ...ifecycleManagerRemovingTransformEvaluatorTest.java |  8 ++++----
 .../beam/runners/direct/ParDoEvaluatorTest.java       |  4 ++--
 6 files changed, 16 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1cc16b0d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
index 9bcd569..e537962 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
@@ -31,16 +31,16 @@ import org.slf4j.LoggerFactory;
 class DoFnLifecycleManagerRemovingTransformEvaluator<InputT> implements 
TransformEvaluator<InputT> {
   private static final Logger LOG =
       
LoggerFactory.getLogger(DoFnLifecycleManagerRemovingTransformEvaluator.class);
-  private final ParDoEvaluator<InputT, ?> underlying;
+  private final ParDoEvaluator<InputT> underlying;
   private final DoFnLifecycleManager lifecycleManager;
 
   public static <InputT> 
DoFnLifecycleManagerRemovingTransformEvaluator<InputT> wrapping(
-      ParDoEvaluator<InputT, ?> underlying, DoFnLifecycleManager 
lifecycleManager) {
+      ParDoEvaluator<InputT> underlying, DoFnLifecycleManager 
lifecycleManager) {
     return new DoFnLifecycleManagerRemovingTransformEvaluator<>(underlying, 
lifecycleManager);
   }
 
   private DoFnLifecycleManagerRemovingTransformEvaluator(
-      ParDoEvaluator<InputT, ?> underlying, DoFnLifecycleManager 
lifecycleManager) {
+      ParDoEvaluator<InputT> underlying, DoFnLifecycleManager 
lifecycleManager) {
     this.underlying = underlying;
     this.lifecycleManager = lifecycleManager;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/1cc16b0d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index 49d0723..131716f 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -40,9 +40,9 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 
-class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
+class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> {
 
-  public static <InputT, OutputT> ParDoEvaluator<InputT, OutputT> create(
+  public static <InputT, OutputT> ParDoEvaluator<InputT> create(
       EvaluationContext evaluationContext,
       DirectStepContext stepContext,
       AppliedPTransform<?, ?, ?> application,
@@ -93,13 +93,11 @@ class ParDoEvaluator<InputT, OutputT> implements 
TransformEvaluator<InputT> {
       throw UserCodeException.wrap(e);
     }
 
-    return new ParDoEvaluator<>(
-        evaluationContext, runner, application, aggregatorChanges, 
outputManager, stepContext);
+    return new ParDoEvaluator<>(runner, application, aggregatorChanges, 
outputManager, stepContext);
   }
 
   
////////////////////////////////////////////////////////////////////////////////////////////////
 
-  private final EvaluationContext evaluationContext;
   private final PushbackSideInputDoFnRunner<InputT, ?> fnRunner;
   private final AppliedPTransform<?, ?, ?> transform;
   private final AggregatorContainer.Mutator aggregatorChanges;
@@ -109,13 +107,11 @@ class ParDoEvaluator<InputT, OutputT> implements 
TransformEvaluator<InputT> {
   private final ImmutableList.Builder<WindowedValue<InputT>> 
unprocessedElements;
 
   private ParDoEvaluator(
-      EvaluationContext evaluationContext,
       PushbackSideInputDoFnRunner<InputT, ?> fnRunner,
       AppliedPTransform<?, ?, ?> transform,
       AggregatorContainer.Mutator aggregatorChanges,
       BundleOutputManager outputManager,
       DirectStepContext stepContext) {
-    this.evaluationContext = evaluationContext;
     this.fnRunner = fnRunner;
     this.transform = transform;
     this.outputManager = outputManager;
@@ -153,11 +149,11 @@ class ParDoEvaluator<InputT, OutputT> implements 
TransformEvaluator<InputT> {
     } catch (Exception e) {
       throw UserCodeException.wrap(e);
     }
-    StepTransformResult.Builder resultBuilder;
+    StepTransformResult.Builder<InputT> resultBuilder;
     CopyOnAccessInMemoryStateInternals<?> state = stepContext.commitState();
     if (state != null) {
       resultBuilder =
-          StepTransformResult.withHold(transform, 
state.getEarliestWatermarkHold())
+          StepTransformResult.<InputT>withHold(transform, 
state.getEarliestWatermarkHold())
               .withState(state);
     } else {
       resultBuilder = StepTransformResult.withoutHold(transform);

http://git-wip-us.apache.org/repos/asf/beam/blob/1cc16b0d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
index 0372295..93f204a 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
@@ -126,7 +126,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> 
implements TransformEvaluator
         fnManager);
   }
 
-  ParDoEvaluator<InputT, OutputT> createParDoEvaluator(
+  ParDoEvaluator<InputT> createParDoEvaluator(
       AppliedPTransform<PCollection<InputT>, PCollectionTuple, ?> application,
       StructuralKey<?> key,
       List<PCollectionView<?>> sideInputs,

http://git-wip-us.apache.org/repos/asf/beam/blob/1cc16b0d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
index 64cef35..00b16dd 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -98,7 +98,7 @@ class SplittableProcessElementsEvaluatorFactory<
             .getExecutionContext(application, inputBundle.getKey())
             .getOrCreateStepContext(stepName, stepName);
 
-    ParDoEvaluator<KeyedWorkItem<String, ElementAndRestriction<InputT, 
RestrictionT>>, OutputT>
+    ParDoEvaluator<KeyedWorkItem<String, ElementAndRestriction<InputT, 
RestrictionT>>>
         parDoEvaluator =
             delegateFactory.createParDoEvaluator(
                 application,

http://git-wip-us.apache.org/repos/asf/beam/blob/1cc16b0d/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
index d046ce5..1ac4d6d 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
@@ -53,7 +53,7 @@ public class 
DoFnLifecycleManagerRemovingTransformEvaluatorTest {
 
   @Test
   public void delegatesToUnderlying() throws Exception {
-    ParDoEvaluator<Object, Object> underlying = mock(ParDoEvaluator.class);
+    ParDoEvaluator<Object> underlying = mock(ParDoEvaluator.class);
     DoFn<?, ?> original = lifecycleManager.get();
     TransformEvaluator<Object> evaluator =
         DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(underlying, 
lifecycleManager);
@@ -72,7 +72,7 @@ public class 
DoFnLifecycleManagerRemovingTransformEvaluatorTest {
 
   @Test
   public void removesOnExceptionInProcessElement() throws Exception {
-    ParDoEvaluator<Object, Object> underlying = mock(ParDoEvaluator.class);
+    ParDoEvaluator<Object> underlying = mock(ParDoEvaluator.class);
     
doThrow(Exception.class).when(underlying).processElement(any(WindowedValue.class));
 
     DoFn<?, ?> original = lifecycleManager.get();
@@ -91,7 +91,7 @@ public class 
DoFnLifecycleManagerRemovingTransformEvaluatorTest {
 
   @Test
   public void removesOnExceptionInOnTimer() throws Exception {
-    ParDoEvaluator<Object, Object> underlying = mock(ParDoEvaluator.class);
+    ParDoEvaluator<Object> underlying = mock(ParDoEvaluator.class);
     doThrow(Exception.class)
         .when(underlying)
         .onTimer(any(TimerData.class), any(BoundedWindow.class));
@@ -114,7 +114,7 @@ public class 
DoFnLifecycleManagerRemovingTransformEvaluatorTest {
 
   @Test
   public void removesOnExceptionInFinishBundle() throws Exception {
-    ParDoEvaluator<Object, Object> underlying = mock(ParDoEvaluator.class);
+    ParDoEvaluator<Object> underlying = mock(ParDoEvaluator.class);
     doThrow(Exception.class).when(underlying).finishBundle();
 
     DoFn<?, ?> original = lifecycleManager.get();

http://git-wip-us.apache.org/repos/asf/beam/blob/1cc16b0d/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
index 65a1248..2be0f9d 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
@@ -98,7 +98,7 @@ public class ParDoEvaluatorTest {
     UncommittedBundle<Integer> outputBundle = 
bundleFactory.createBundle(output);
     when(evaluationContext.createBundle(output)).thenReturn(outputBundle);
 
-    ParDoEvaluator<Integer, Integer> evaluator =
+    ParDoEvaluator<Integer> evaluator =
         createEvaluator(singletonView, fn, output);
 
     IntervalWindow nonGlobalWindow = new IntervalWindow(new Instant(0), new 
Instant(10_000L));
@@ -130,7 +130,7 @@ public class ParDoEvaluatorTest {
             WindowedValue.timestampedValueInGlobalWindow(6, new 
Instant(2468L))));
   }
 
-  private ParDoEvaluator<Integer, Integer> createEvaluator(
+  private ParDoEvaluator<Integer> createEvaluator(
       PCollectionView<Integer> singletonView,
       RecorderFn fn,
       PCollection<Integer> output) {

Reply via email to