Minor cleanups in ParDoEvaluator
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1cc16b0d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1cc16b0d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1cc16b0d Branch: refs/heads/DSL_SQL Commit: 1cc16b0d6cea7b01b01427758eaf427cc29635b6 Parents: 3fd8890 Author: Eugene Kirpichov <kirpic...@google.com> Authored: Mon Apr 17 12:25:02 2017 -0700 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Tue Apr 18 18:02:06 2017 -0700 ---------------------------------------------------------------------- ...oFnLifecycleManagerRemovingTransformEvaluator.java | 6 +++--- .../apache/beam/runners/direct/ParDoEvaluator.java | 14 +++++--------- .../beam/runners/direct/ParDoEvaluatorFactory.java | 2 +- .../SplittableProcessElementsEvaluatorFactory.java | 2 +- ...ifecycleManagerRemovingTransformEvaluatorTest.java | 8 ++++---- .../beam/runners/direct/ParDoEvaluatorTest.java | 4 ++-- 6 files changed, 16 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/1cc16b0d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java index 9bcd569..e537962 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java @@ -31,16 +31,16 @@ import org.slf4j.LoggerFactory; class DoFnLifecycleManagerRemovingTransformEvaluator<InputT> implements TransformEvaluator<InputT> { private static final Logger LOG = LoggerFactory.getLogger(DoFnLifecycleManagerRemovingTransformEvaluator.class); - private final ParDoEvaluator<InputT, ?> underlying; + private final ParDoEvaluator<InputT> underlying; private final DoFnLifecycleManager lifecycleManager; public static <InputT> DoFnLifecycleManagerRemovingTransformEvaluator<InputT> wrapping( - ParDoEvaluator<InputT, ?> underlying, DoFnLifecycleManager lifecycleManager) { + ParDoEvaluator<InputT> underlying, DoFnLifecycleManager lifecycleManager) { return new DoFnLifecycleManagerRemovingTransformEvaluator<>(underlying, lifecycleManager); } private DoFnLifecycleManagerRemovingTransformEvaluator( - ParDoEvaluator<InputT, ?> underlying, DoFnLifecycleManager lifecycleManager) { + ParDoEvaluator<InputT> underlying, DoFnLifecycleManager lifecycleManager) { this.underlying = underlying; this.lifecycleManager = lifecycleManager; } http://git-wip-us.apache.org/repos/asf/beam/blob/1cc16b0d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java index 49d0723..131716f 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java @@ -40,9 +40,9 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; -class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> { +class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> { - public static <InputT, OutputT> ParDoEvaluator<InputT, OutputT> create( + public static <InputT, OutputT> ParDoEvaluator<InputT> create( EvaluationContext evaluationContext, DirectStepContext stepContext, AppliedPTransform<?, ?, ?> application, @@ -93,13 +93,11 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> { throw UserCodeException.wrap(e); } - return new ParDoEvaluator<>( - evaluationContext, runner, application, aggregatorChanges, outputManager, stepContext); + return new ParDoEvaluator<>(runner, application, aggregatorChanges, outputManager, stepContext); } //////////////////////////////////////////////////////////////////////////////////////////////// - private final EvaluationContext evaluationContext; private final PushbackSideInputDoFnRunner<InputT, ?> fnRunner; private final AppliedPTransform<?, ?, ?> transform; private final AggregatorContainer.Mutator aggregatorChanges; @@ -109,13 +107,11 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> { private final ImmutableList.Builder<WindowedValue<InputT>> unprocessedElements; private ParDoEvaluator( - EvaluationContext evaluationContext, PushbackSideInputDoFnRunner<InputT, ?> fnRunner, AppliedPTransform<?, ?, ?> transform, AggregatorContainer.Mutator aggregatorChanges, BundleOutputManager outputManager, DirectStepContext stepContext) { - this.evaluationContext = evaluationContext; this.fnRunner = fnRunner; this.transform = transform; this.outputManager = outputManager; @@ -153,11 +149,11 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> { } catch (Exception e) { throw UserCodeException.wrap(e); } - StepTransformResult.Builder resultBuilder; + StepTransformResult.Builder<InputT> resultBuilder; CopyOnAccessInMemoryStateInternals<?> state = stepContext.commitState(); if (state != null) { resultBuilder = - StepTransformResult.withHold(transform, state.getEarliestWatermarkHold()) + StepTransformResult.<InputT>withHold(transform, state.getEarliestWatermarkHold()) .withState(state); } else { resultBuilder = StepTransformResult.withoutHold(transform); http://git-wip-us.apache.org/repos/asf/beam/blob/1cc16b0d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java index 0372295..93f204a 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java @@ -126,7 +126,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator fnManager); } - ParDoEvaluator<InputT, OutputT> createParDoEvaluator( + ParDoEvaluator<InputT> createParDoEvaluator( AppliedPTransform<PCollection<InputT>, PCollectionTuple, ?> application, StructuralKey<?> key, List<PCollectionView<?>> sideInputs, http://git-wip-us.apache.org/repos/asf/beam/blob/1cc16b0d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java index 64cef35..00b16dd 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java @@ -98,7 +98,7 @@ class SplittableProcessElementsEvaluatorFactory< .getExecutionContext(application, inputBundle.getKey()) .getOrCreateStepContext(stepName, stepName); - ParDoEvaluator<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> + ParDoEvaluator<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>> parDoEvaluator = delegateFactory.createParDoEvaluator( application, http://git-wip-us.apache.org/repos/asf/beam/blob/1cc16b0d/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java index d046ce5..1ac4d6d 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java @@ -53,7 +53,7 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest { @Test public void delegatesToUnderlying() throws Exception { - ParDoEvaluator<Object, Object> underlying = mock(ParDoEvaluator.class); + ParDoEvaluator<Object> underlying = mock(ParDoEvaluator.class); DoFn<?, ?> original = lifecycleManager.get(); TransformEvaluator<Object> evaluator = DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(underlying, lifecycleManager); @@ -72,7 +72,7 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest { @Test public void removesOnExceptionInProcessElement() throws Exception { - ParDoEvaluator<Object, Object> underlying = mock(ParDoEvaluator.class); + ParDoEvaluator<Object> underlying = mock(ParDoEvaluator.class); doThrow(Exception.class).when(underlying).processElement(any(WindowedValue.class)); DoFn<?, ?> original = lifecycleManager.get(); @@ -91,7 +91,7 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest { @Test public void removesOnExceptionInOnTimer() throws Exception { - ParDoEvaluator<Object, Object> underlying = mock(ParDoEvaluator.class); + ParDoEvaluator<Object> underlying = mock(ParDoEvaluator.class); doThrow(Exception.class) .when(underlying) .onTimer(any(TimerData.class), any(BoundedWindow.class)); @@ -114,7 +114,7 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest { @Test public void removesOnExceptionInFinishBundle() throws Exception { - ParDoEvaluator<Object, Object> underlying = mock(ParDoEvaluator.class); + ParDoEvaluator<Object> underlying = mock(ParDoEvaluator.class); doThrow(Exception.class).when(underlying).finishBundle(); DoFn<?, ?> original = lifecycleManager.get(); http://git-wip-us.apache.org/repos/asf/beam/blob/1cc16b0d/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java index 65a1248..2be0f9d 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java @@ -98,7 +98,7 @@ public class ParDoEvaluatorTest { UncommittedBundle<Integer> outputBundle = bundleFactory.createBundle(output); when(evaluationContext.createBundle(output)).thenReturn(outputBundle); - ParDoEvaluator<Integer, Integer> evaluator = + ParDoEvaluator<Integer> evaluator = createEvaluator(singletonView, fn, output); IntervalWindow nonGlobalWindow = new IntervalWindow(new Instant(0), new Instant(10_000L)); @@ -130,7 +130,7 @@ public class ParDoEvaluatorTest { WindowedValue.timestampedValueInGlobalWindow(6, new Instant(2468L)))); } - private ParDoEvaluator<Integer, Integer> createEvaluator( + private ParDoEvaluator<Integer> createEvaluator( PCollectionView<Integer> singletonView, RecorderFn fn, PCollection<Integer> output) {