NonNull by default in runners/core
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/87b07d83 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/87b07d83 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/87b07d83 Branch: refs/heads/tez-runner Commit: 87b07d83171f8718b556fa15ec27ce824a108505 Parents: da2a606 Author: Kenneth Knowles <k...@google.com> Authored: Sun Oct 29 21:09:08 2017 -0700 Committer: Kenneth Knowles <k...@apache.org> Committed: Thu Nov 9 15:05:50 2017 -0800 ---------------------------------------------------------------------- .../runners/apex/translation/utils/NoOpStepContext.java | 4 ++-- .../beam/runners/core/InMemoryStateInternals.java | 10 +++++----- ...utAndTimeBoundedSplittableProcessElementInvoker.java | 8 ++++---- .../org/apache/beam/runners/core/PeekingReiterator.java | 3 ++- .../org/apache/beam/runners/core/SimpleDoFnRunner.java | 2 +- .../runners/core/SimplePushbackSideInputDoFnRunner.java | 4 +++- .../runners/core/SplittableParDoViaKeyedWorkItems.java | 12 +++++++----- .../runners/core/SplittableProcessElementInvoker.java | 6 +++--- .../java/org/apache/beam/runners/core/package-info.java | 4 ++++ .../translation/functions/FlinkNoOpStepContext.java | 4 ++-- .../streaming/state/FlinkBroadcastStateInternals.java | 2 ++ .../streaming/state/FlinkSplitStateInternals.java | 2 ++ .../runners/spark/translation/SparkProcessContext.java | 4 ++-- 13 files changed, 39 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/87b07d83/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java index b49e4da..9268393 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java @@ -30,11 +30,11 @@ public class NoOpStepContext implements StepContext, Serializable { @Override public StateInternals stateInternals() { - return null; + throw new UnsupportedOperationException("stateInternals is not supported"); } @Override public TimerInternals timerInternals() { - return null; + throw new UnsupportedOperationException("timerInternals is not supported"); } } http://git-wip-us.apache.org/repos/asf/beam/blob/87b07d83/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java index 075e264..9193a74 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java @@ -59,18 +59,18 @@ import org.joda.time.Instant; @Experimental(Kind.STATE) public class InMemoryStateInternals<K> implements StateInternals { - public static <K> InMemoryStateInternals<K> forKey(K key) { + public static <K> InMemoryStateInternals<K> forKey(@Nullable K key) { return new InMemoryStateInternals<>(key); } - private final K key; + private final @Nullable K key; - protected InMemoryStateInternals(K key) { + protected InMemoryStateInternals(@Nullable K key) { this.key = key; } @Override - public K getKey() { + public @Nullable K getKey() { return key; } @@ -179,7 +179,7 @@ public class InMemoryStateInternals<K> implements StateInternals { public static final class InMemoryValue<T> implements ValueState<T>, InMemoryState<InMemoryValue<T>> { private boolean isCleared = true; - private T value = null; + private @Nullable T value = null; @Override public void clear() { http://git-wip-us.apache.org/repos/asf/beam/blob/87b07d83/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java index d830db5..c43ca47 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java @@ -161,7 +161,7 @@ public class OutputAndTimeBoundedSplittableProcessElementInvoker< // Currently we can't verify this because there are no hooks into tryClaim(). // See https://issues.apache.org/jira/browse/BEAM-2607 processContext.cancelScheduledCheckpoint(); - KV<RestrictionT, Instant> residual = processContext.getTakenCheckpoint(); + @Nullable KV<RestrictionT, Instant> residual = processContext.getTakenCheckpoint(); if (cont.shouldResume()) { if (residual == null) { // No checkpoint had been taken by the runner while the ProcessElement call ran, however @@ -207,13 +207,13 @@ public class OutputAndTimeBoundedSplittableProcessElementInvoker< // even if these events happen almost at the same time. // This is either the result of the sole tracker.checkpoint() call, or null if // the call completed before reaching the given number of outputs or duration. - private RestrictionT checkpoint; + private @Nullable RestrictionT checkpoint; // Watermark captured at the moment before checkpoint was taken, describing a lower bound // on the output from "checkpoint". - private Instant residualWatermark; + private @Nullable Instant residualWatermark; // A handle on the scheduled action to take a checkpoint. private Future<?> scheduledCheckpoint; - private Instant lastReportedWatermark; + private @Nullable Instant lastReportedWatermark; public ProcessContext(WindowedValue<InputT> element, TrackerT tracker) { fn.super(); http://git-wip-us.apache.org/repos/asf/beam/blob/87b07d83/runners/core-java/src/main/java/org/apache/beam/runners/core/PeekingReiterator.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PeekingReiterator.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PeekingReiterator.java index fcdff3b..123a8d5 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PeekingReiterator.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PeekingReiterator.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import java.util.NoSuchElementException; +import javax.annotation.Nullable; import org.apache.beam.sdk.util.common.Reiterator; /** @@ -29,7 +30,7 @@ import org.apache.beam.sdk.util.common.Reiterator; * @param <T> the type of elements returned by this iterator */ public final class PeekingReiterator<T> implements Reiterator<T> { - private T nextElement; + private @Nullable T nextElement; private boolean nextElementComputed; private final Reiterator<T> iterator; http://git-wip-us.apache.org/repos/asf/beam/blob/87b07d83/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index c3bfef6..6ae6754 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -546,7 +546,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out private final TimeDomain timeDomain; /** Lazily initialized; should only be accessed via {@link #getNamespace()}. */ - private StateNamespace namespace; + private @Nullable StateNamespace namespace; /** * The state namespace for this context. http://git-wip-us.apache.org/repos/asf/beam/blob/87b07d83/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java index 3f77f7d..591a6a2 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Set; +import javax.annotation.Nullable; import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; @@ -39,7 +40,8 @@ public class SimplePushbackSideInputDoFnRunner<InputT, OutputT> private final Collection<PCollectionView<?>> views; private final ReadyCheckingSideInputReader sideInputReader; - private Set<BoundedWindow> notReadyWindows; + // Initialized in startBundle() + private @Nullable Set<BoundedWindow> notReadyWindows; public static <InputT, OutputT> SimplePushbackSideInputDoFnRunner<InputT, OutputT> create( DoFnRunner<InputT, OutputT> underlying, http://git-wip-us.apache.org/repos/asf/beam/blob/87b07d83/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java index 400df19..6c7f671 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; import java.util.List; import java.util.Map; +import javax.annotation.Nullable; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.construction.PTransformReplacements; import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform; @@ -244,12 +245,13 @@ public class SplittableParDoViaKeyedWorkItems { private final Coder<RestrictionT> restrictionCoder; private final WindowingStrategy<InputT, ?> inputWindowingStrategy; - private transient StateInternalsFactory<String> stateInternalsFactory; - private transient TimerInternalsFactory<String> timerInternalsFactory; - private transient SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, TrackerT> + private transient @Nullable StateInternalsFactory<String> stateInternalsFactory; + private transient @Nullable TimerInternalsFactory<String> timerInternalsFactory; + private transient @Nullable SplittableProcessElementInvoker< + InputT, OutputT, RestrictionT, TrackerT> processElementInvoker; - private transient DoFnInvoker<InputT, OutputT> invoker; + private transient @Nullable DoFnInvoker<InputT, OutputT> invoker; public ProcessFn( DoFn<InputT, OutputT> fn, @@ -376,7 +378,7 @@ public class SplittableParDoViaKeyedWorkItems { return; } restrictionState.write(result.getResidualRestriction()); - Instant futureOutputWatermark = result.getFutureOutputWatermark(); + @Nullable Instant futureOutputWatermark = result.getFutureOutputWatermark(); if (futureOutputWatermark == null) { futureOutputWatermark = elementAndRestriction.getKey().getTimestamp(); } http://git-wip-us.apache.org/repos/asf/beam/blob/87b07d83/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java index 7732df3..5b9cbf2 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java @@ -37,12 +37,12 @@ public abstract class SplittableProcessElementInvoker< @Nullable private final RestrictionT residualRestriction; private final DoFn.ProcessContinuation continuation; - private final Instant futureOutputWatermark; + private final @Nullable Instant futureOutputWatermark; public Result( @Nullable RestrictionT residualRestriction, DoFn.ProcessContinuation continuation, - Instant futureOutputWatermark) { + @Nullable Instant futureOutputWatermark) { this.continuation = checkNotNull(continuation); if (continuation.shouldResume()) { checkNotNull(residualRestriction); @@ -65,7 +65,7 @@ public abstract class SplittableProcessElementInvoker< return continuation; } - public Instant getFutureOutputWatermark() { + public @Nullable Instant getFutureOutputWatermark() { return futureOutputWatermark; } } http://git-wip-us.apache.org/repos/asf/beam/blob/87b07d83/runners/core-java/src/main/java/org/apache/beam/runners/core/package-info.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/package-info.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/package-info.java index d250a6a..47e96dc 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/package-info.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/package-info.java @@ -19,4 +19,8 @@ /** * Provides utilities for Beam runner authors. */ +@DefaultAnnotation(NonNull.class) package org.apache.beam.runners.core; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; http://git-wip-us.apache.org/repos/asf/beam/blob/87b07d83/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java index 9c7b636..fd45f32 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java @@ -28,12 +28,12 @@ public class FlinkNoOpStepContext implements StepContext { @Override public StateInternals stateInternals() { - return null; + throw new UnsupportedOperationException("stateInternals is not supported"); } @Override public TimerInternals timerInternals() { - return null; + throw new UnsupportedOperationException("timerInternals is not supported"); } } http://git-wip-us.apache.org/repos/asf/beam/blob/87b07d83/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java index 6cc2429..a6da211 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import javax.annotation.Nullable; import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateTag; @@ -77,6 +78,7 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals { } @Override + @Nullable public K getKey() { return null; } http://git-wip-us.apache.org/repos/asf/beam/blob/87b07d83/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java index 09e59fd..48b9613 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java @@ -19,6 +19,7 @@ package org.apache.beam.runners.flink.translation.wrappers.streaming.state; import com.google.common.collect.Iterators; import java.util.Collections; +import javax.annotation.Nullable; import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateTag; @@ -61,6 +62,7 @@ public class FlinkSplitStateInternals<K> implements StateInternals { } @Override + @Nullable public K getKey() { return null; } http://git-wip-us.apache.org/repos/asf/beam/blob/87b07d83/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java index 729eb1c..8b85155 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java @@ -106,12 +106,12 @@ class SparkProcessContext<FnInputT, FnOutputT, OutputT> { @Override public StateInternals stateInternals() { - return null; + throw new UnsupportedOperationException("stateInternals not supported"); } @Override public TimerInternals timerInternals() { - return null; + throw new UnsupportedOperationException("timerInternals not supported"); } }