Fix access levels on SimpleDoFnRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bac00e1a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bac00e1a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bac00e1a Branch: refs/heads/master Commit: bac00e1a4e0bd7a223f4645e438848c22b830ce1 Parents: 48a7a55 Author: Kenneth Knowles <k...@google.com> Authored: Mon Oct 10 15:04:42 2016 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Tue Oct 11 15:09:21 2016 -0700 ---------------------------------------------------------------------- .../beam/runners/core/SimpleDoFnRunner.java | 43 ++++++++++---------- 1 file changed, 21 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bac00e1a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 594ca5c..8f25705 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -63,11 +63,11 @@ import org.joda.time.format.PeriodFormat; public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> { /** The {@link OldDoFn} being run. */ - public final OldDoFn<InputT, OutputT> fn; + private final OldDoFn<InputT, OutputT> fn; /** The context used for running the {@link OldDoFn}. */ - public final DoFnContext<InputT, OutputT> context; + private final DoFnContext<InputT, OutputT> context; - protected SimpleDoFnRunner(PipelineOptions options, OldDoFn<InputT, OutputT> fn, + public SimpleDoFnRunner(PipelineOptions options, OldDoFn<InputT, OutputT> fn, SideInputReader sideInputReader, OutputManager outputManager, TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> sideOutputTags, StepContext stepContext, @@ -85,16 +85,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out windowingStrategy == null ? null : windowingStrategy.getWindowFn()); } - protected void invokeProcessElement(WindowedValue<InputT> elem) { - final OldDoFn<InputT, OutputT>.ProcessContext processContext = createProcessContext(elem); - // This can contain user code. Wrap it in case it throws an exception. - try { - fn.processElement(processContext); - } catch (Exception ex) { - throw wrapUserCodeException(ex); - } - } - @Override public void startBundle() { // This can contain user code. Wrap it in case it throws an exception. @@ -121,6 +111,16 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out } } + private void invokeProcessElement(WindowedValue<InputT> elem) { + final OldDoFn<InputT, OutputT>.ProcessContext processContext = createProcessContext(elem); + // This can contain user code. Wrap it in case it throws an exception. + try { + fn.processElement(processContext); + } catch (Exception ex) { + throw wrapUserCodeException(ex); + } + } + @Override public void finishBundle() { // This can contain user code. Wrap it in case it throws an exception. @@ -135,12 +135,12 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out /** * Returns a new {@link OldDoFn.ProcessContext} for the given element. */ - protected OldDoFn<InputT, OutputT>.ProcessContext createProcessContext( + private OldDoFn<InputT, OutputT>.ProcessContext createProcessContext( WindowedValue<InputT> elem) { return new DoFnProcessContext<InputT, OutputT>(fn, context, elem); } - protected RuntimeException wrapUserCodeException(Throwable t) { + private RuntimeException wrapUserCodeException(Throwable t) { throw UserCodeException.wrapIf(!isSystemDoFn(), t); } @@ -154,8 +154,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out * @param <InputT> the type of the {@link OldDoFn} (main) input elements * @param <OutputT> the type of the {@link OldDoFn} (main) output elements */ - private static class DoFnContext<InputT, OutputT> - extends OldDoFn<InputT, OutputT>.Context { + private static class DoFnContext<InputT, OutputT> extends OldDoFn<InputT, OutputT>.Context { private static final int MAX_SIDE_OUTPUTS = 1000; final PipelineOptions options; @@ -276,7 +275,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out } } - protected <T> void sideOutputWindowedValue(TupleTag<T> tag, + private <T> void sideOutputWindowedValue(TupleTag<T> tag, T output, Instant timestamp, Collection<? extends BoundedWindow> windows, @@ -284,7 +283,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out sideOutputWindowedValue(tag, makeWindowedValue(output, timestamp, windows, pane)); } - protected <T> void sideOutputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedElem) { + private <T> void sideOutputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedElem) { if (!outputTags.contains(tag)) { // This tag wasn't declared nor was it seen before during this execution. // Thus, this must be a new, undeclared and unconsumed output. @@ -337,13 +336,13 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out } /** - * A concrete implementation of {@link OldDoFn.ProcessContext} used for - * running a {@link OldDoFn} over a single element. + * A concrete implementation of {@link OldDoFn.ProcessContext} used for running a {@link OldDoFn} + * over a single element. * * @param <InputT> the type of the {@link OldDoFn} (main) input elements * @param <OutputT> the type of the {@link OldDoFn} (main) output elements */ - static class DoFnProcessContext<InputT, OutputT> + private static class DoFnProcessContext<InputT, OutputT> extends OldDoFn<InputT, OutputT>.ProcessContext {