Removes final minor usages of OldDoFn outside OldDoFn itself
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a3b5f968 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a3b5f968 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a3b5f968 Branch: refs/heads/jstorm-runner Commit: a3b5f968c1ae2e4f712bfcf200a03d8d193fd90c Parents: 3e24388 Author: Eugene Kirpichov <[email protected]> Authored: Tue Apr 11 15:06:45 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Fri Apr 14 23:34:49 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/core/AssignWindowsDoFn.java | 78 ----- .../apache/beam/runners/core/DoFnAdapters.java | 328 ------------------- .../apache/beam/runners/core/DoFnRunners.java | 2 +- .../GroupAlsoByWindowViaOutputBufferDoFn.java | 17 +- .../core/GroupAlsoByWindowViaWindowSetDoFn.java | 7 +- .../GroupAlsoByWindowViaWindowSetNewDoFn.java | 11 +- .../core/GroupAlsoByWindowsAggregators.java | 28 ++ .../runners/core/GroupAlsoByWindowsDoFn.java | 46 --- .../core/LateDataDroppingDoFnRunner.java | 3 +- ...roupAlsoByWindowViaOutputBufferDoFnTest.java | 4 +- .../core/GroupAlsoByWindowsProperties.java | 27 +- .../beam/runners/core/ReduceFnTester.java | 3 +- .../GroupAlsoByWindowEvaluatorFactory.java | 6 +- .../SparkGroupAlsoByWindowViaWindowSet.java | 9 +- .../spark/translation/SparkAssignWindowFn.java | 3 +- ...SparkGroupAlsoByWindowViaOutputBufferFn.java | 8 +- 16 files changed, 85 insertions(+), 495 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java deleted file mode 100644 index bbf3574..0000000 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core; - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.common.collect.Iterables; -import java.util.Collection; -import org.apache.beam.runners.core.OldDoFn.RequiresWindowAccess; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.SystemDoFnInternal; -import org.apache.beam.sdk.values.PCollection; -import org.joda.time.Instant; - -/** - * {@link OldDoFn} that tags elements of a {@link PCollection} with windows, according to the - * provided {@link WindowFn}. - * - * @param <T> Type of elements being windowed - * @param <W> Window type - */ -@SystemDoFnInternal -public class AssignWindowsDoFn<T, W extends BoundedWindow> extends OldDoFn<T, T> - implements RequiresWindowAccess { - private WindowFn<? super T, W> fn; - - public AssignWindowsDoFn(WindowFn<? super T, W> fn) { - this.fn = - checkNotNull( - fn, - "%s provided to %s cannot be null", - WindowFn.class.getSimpleName(), - AssignWindowsDoFn.class.getSimpleName()); - } - - @Override - @SuppressWarnings("unchecked") - public void processElement(final ProcessContext c) throws Exception { - Collection<W> windows = - ((WindowFn<T, W>) fn).assignWindows( - ((WindowFn<T, W>) fn).new AssignContext() { - @Override - public T element() { - return c.element(); - } - - @Override - public Instant timestamp() { - return c.timestamp(); - } - - @Override - public BoundedWindow window() { - return Iterables.getOnlyElement(c.windowingInternals().windows()); - } - }); - - c.windowingInternals() - .outputWindowedValue(c.element(), c.timestamp(), windows, PaneInfo.NO_FIRING); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java deleted file mode 100644 index 66ad736..0000000 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java +++ /dev/null @@ -1,328 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core; - -import java.io.IOException; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.AggregatorRetriever; -import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFn.Context; -import org.apache.beam.sdk.transforms.DoFn.OnTimerContext; -import org.apache.beam.sdk.transforms.DoFn.ProcessContext; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; -import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature; -import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; -import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.Timer; -import org.apache.beam.sdk.util.state.State; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; -import org.joda.time.Duration; -import org.joda.time.Instant; - -/** - * Utility class containing adapters to/from {@link DoFn} and {@link OldDoFn}. - * - * @deprecated This class will go away when we start running {@link DoFn}'s directly (using {@link - * DoFnInvoker}) rather than via {@link OldDoFn}. - */ -@Deprecated -public class DoFnAdapters { - /** Should not be instantiated. */ - private DoFnAdapters() {} - - /** Creates an {@link OldDoFn} that delegates to the {@link DoFn}. */ - @SuppressWarnings({"unchecked", "rawtypes"}) - public static <InputT, OutputT> OldDoFn<InputT, OutputT> toOldDoFn(DoFn<InputT, OutputT> fn) { - DoFnSignature signature = DoFnSignatures.getSignature((Class) fn.getClass()); - if (signature.processElement().observesWindow()) { - return new WindowDoFnAdapter<>(fn); - } else { - return new SimpleDoFnAdapter<>(fn); - } - } - - /** - * Wraps a {@link DoFn} that doesn't require access to {@link BoundedWindow} as an {@link - * OldDoFn}. - */ - private static class SimpleDoFnAdapter<InputT, OutputT> extends OldDoFn<InputT, OutputT> { - private final DoFn<InputT, OutputT> fn; - private transient DoFnInvoker<InputT, OutputT> invoker; - - SimpleDoFnAdapter(DoFn<InputT, OutputT> fn) { - super(AggregatorRetriever.getDelegatingAggregators(fn)); - this.fn = fn; - this.invoker = DoFnInvokers.invokerFor(fn); - } - - @Override - public void setup() throws Exception { - this.invoker.invokeSetup(); - } - - @Override - public void startBundle(Context c) throws Exception { - fn.prepareForProcessing(); - invoker.invokeStartBundle(new ContextAdapter<>(fn, c)); - } - - @Override - public void finishBundle(Context c) throws Exception { - invoker.invokeFinishBundle(new ContextAdapter<>(fn, c)); - } - - @Override - public void teardown() throws Exception { - this.invoker.invokeTeardown(); - } - - @Override - public void processElement(ProcessContext c) throws Exception { - ProcessContextAdapter<InputT, OutputT> adapter = new ProcessContextAdapter<>(fn, c); - invoker.invokeProcessElement(adapter); - } - - @Override - public Duration getAllowedTimestampSkew() { - return fn.getAllowedTimestampSkew(); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - builder.delegate(fn); - } - - private void readObject(java.io.ObjectInputStream in) - throws IOException, ClassNotFoundException { - in.defaultReadObject(); - this.invoker = DoFnInvokers.invokerFor(fn); - } - } - - /** Wraps a {@link DoFn} that requires access to {@link BoundedWindow} as an {@link OldDoFn}. */ - private static class WindowDoFnAdapter<InputT, OutputT> extends SimpleDoFnAdapter<InputT, OutputT> - implements OldDoFn.RequiresWindowAccess { - - WindowDoFnAdapter(DoFn<InputT, OutputT> fn) { - super(fn); - } - } - - /** - * Wraps an {@link OldDoFn.Context} as a {@link DoFnInvoker.ArgumentProvider} inside a {@link - * DoFn.StartBundle} or {@link DoFn.FinishBundle} method, which means the extra context is - * unavailable. - */ - private static class ContextAdapter<InputT, OutputT> extends DoFn<InputT, OutputT>.Context - implements DoFnInvoker.ArgumentProvider<InputT, OutputT> { - - private OldDoFn<InputT, OutputT>.Context context; - - private ContextAdapter(DoFn<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.Context context) { - fn.super(); - this.context = context; - super.setupDelegateAggregators(); - } - - @Override - public PipelineOptions getPipelineOptions() { - return context.getPipelineOptions(); - } - - @Override - public void output(OutputT output) { - context.output(output); - } - - @Override - public void outputWithTimestamp(OutputT output, Instant timestamp) { - context.outputWithTimestamp(output, timestamp); - } - - @Override - public <T> void output(TupleTag<T> tag, T output) { - context.output(tag, output); - } - - @Override - public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { - context.outputWithTimestamp(tag, output, timestamp); - } - - @Override - protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator( - String name, - CombineFn<AggInputT, ?, AggOutputT> combiner) { - return context.createAggregatorInternal(name, combiner); - } - - @Override - public BoundedWindow window() { - // The OldDoFn doesn't allow us to ask for these outside processElement, so this - // should be unreachable. - throw new UnsupportedOperationException( - "Can only get the window in processElement; elsewhere there is no defined window."); - } - - @Override - public Context context(DoFn<InputT, OutputT> doFn) { - return this; - } - - @Override - public ProcessContext processContext(DoFn<InputT, OutputT> doFn) { - throw new UnsupportedOperationException( - "Can only get a ProcessContext in processElement"); - } - - @Override - public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) { - throw new UnsupportedOperationException( - "Timers are not supported for OldDoFn"); - } - - @Override - public RestrictionTracker<?> restrictionTracker() { - throw new UnsupportedOperationException("This is a non-splittable DoFn"); - } - - @Override - public State state(String stateId) { - throw new UnsupportedOperationException("State is not supported by this runner"); - } - - @Override - public Timer timer(String timerId) { - throw new UnsupportedOperationException("Timers are not supported by this runner"); - } - } - - /** - * Wraps an {@link OldDoFn.ProcessContext} as a {@link DoFnInvoker.ArgumentProvider} method. - */ - private static class ProcessContextAdapter<InputT, OutputT> - extends DoFn<InputT, OutputT>.ProcessContext - implements DoFnInvoker.ArgumentProvider<InputT, OutputT> { - - private OldDoFn<InputT, OutputT>.ProcessContext context; - - private ProcessContextAdapter( - DoFn<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.ProcessContext context) { - fn.super(); - this.context = context; - } - - @Override - public PipelineOptions getPipelineOptions() { - return context.getPipelineOptions(); - } - - @Override - public <T> T sideInput(PCollectionView<T> view) { - return context.sideInput(view); - } - - @Override - public void output(OutputT output) { - context.output(output); - } - - @Override - public void outputWithTimestamp(OutputT output, Instant timestamp) { - context.outputWithTimestamp(output, timestamp); - } - - @Override - public <T> void output(TupleTag<T> tag, T output) { - context.output(tag, output); - } - - @Override - public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { - context.outputWithTimestamp(tag, output, timestamp); - } - - @Override - protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator( - String name, CombineFn<AggInputT, ?, AggOutputT> combiner) { - return context.createAggregatorInternal(name, combiner); - } - - @Override - public InputT element() { - return context.element(); - } - - @Override - public Instant timestamp() { - return context.timestamp(); - } - - @Override - public PaneInfo pane() { - return context.pane(); - } - - @Override - public void updateWatermark(Instant watermark) { - throw new UnsupportedOperationException("Only splittable DoFn's can use updateWatermark()"); - } - - @Override - public BoundedWindow window() { - return context.window(); - } - - @Override - public Context context(DoFn<InputT, OutputT> doFn) { - return this; - } - - @Override - public ProcessContext processContext(DoFn<InputT, OutputT> doFn) { - return this; - } - - @Override - public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) { - throw new UnsupportedOperationException("Timers are not supported for OldDoFn"); - } - - @Override - public RestrictionTracker<?> restrictionTracker() { - throw new UnsupportedOperationException("This is a non-splittable DoFn"); - } - - @Override - public State state(String stateId) { - throw new UnsupportedOperationException("State is not supported by this runner"); - } - - @Override - public Timer timer(String timerId) { - throw new UnsupportedOperationException("Timers are not supported by this runner"); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java index b09ee08..06db6e1 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java @@ -105,7 +105,7 @@ public class DoFnRunners { /** * Returns an implementation of {@link DoFnRunner} that handles late data dropping. * - * <p>It drops elements from expired windows before they reach the underlying {@link OldDoFn}. + * <p>It drops elements from expired windows before they reach the underlying {@link DoFn}. */ public static <K, InputT, OutputT, W extends BoundedWindow> DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> lateDataDroppingRunner( http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java index 5508b2e..5bd7e2d 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java @@ -17,23 +17,34 @@ */ package org.apache.beam.runners.core; +import static org.apache.beam.runners.core.GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER; +import static org.apache.beam.runners.core.GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_LATENESS_COUNTER; + import java.util.ArrayList; import java.util.List; import org.apache.beam.runners.core.construction.Triggers; import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachines; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.SystemDoFnInternal; +import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.KV; import org.joda.time.Instant; /** - * The default batch {@link GroupAlsoByWindowsDoFn} implementation, if no specialized "fast path" - * implementation is applicable. + * The default batch {@link GroupAlsoByWindowsAggregators} implementation, if no specialized "fast + * path" implementation is applicable. */ @SystemDoFnInternal public class GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, OutputT, W extends BoundedWindow> - extends GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> { + extends OldDoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> { + protected final Aggregator<Long, Long> droppedDueToClosedWindow = + createAggregator(DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs()); + protected final Aggregator<Long, Long> droppedDueToLateness = + createAggregator(DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs()); private final WindowingStrategy<?, W> strategy; private final StateInternalsFactory<K> stateInternalsFactory; http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java index bf48df1..e6be93a 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java @@ -28,7 +28,7 @@ import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; /** - * A general {@link GroupAlsoByWindowsDoFn}. This delegates all of the logic to the + * A general {@link GroupAlsoByWindowsAggregators}. This delegates all of the logic to the * {@link ReduceFnRunner}. */ @SystemDoFnInternal @@ -46,9 +46,10 @@ public class GroupAlsoByWindowViaWindowSetDoFn< protected final Aggregator<Long, Long> droppedDueToClosedWindow = createAggregator( - GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs()); + GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs()); protected final Aggregator<Long, Long> droppedDueToLateness = - createAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs()); + createAggregator( + GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs()); private final WindowingStrategy<Object, W> windowingStrategy; private final StateInternalsFactory<K> stateInternalsFactory; http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java index 0cf6e2d..e146bfc 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java @@ -35,7 +35,7 @@ import org.apache.beam.sdk.values.TupleTag; import org.joda.time.Instant; /** - * A general {@link GroupAlsoByWindowsDoFn}. This delegates all of the logic to the + * A general {@link GroupAlsoByWindowsAggregators}. This delegates all of the logic to the * {@link ReduceFnRunner}. */ @SystemDoFnInternal @@ -61,9 +61,10 @@ public class GroupAlsoByWindowViaWindowSetNewDoFn< protected final Aggregator<Long, Long> droppedDueToClosedWindow = createAggregator( - GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs()); + GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs()); protected final Aggregator<Long, Long> droppedDueToLateness = - createAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs()); + createAggregator( + GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs()); private final WindowingStrategy<Object, W> windowingStrategy; private SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn; private transient StateInternalsFactory<K> stateInternalsFactory; @@ -144,10 +145,6 @@ public class GroupAlsoByWindowViaWindowSetNewDoFn< reduceFnRunner.persist(); } - public OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asDoFn() { - throw new RuntimeException("Not implement!"); - } - public Aggregator<Long, Long> getDroppedDueToLatenessAggregator() { return droppedDueToLateness; } http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsAggregators.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsAggregators.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsAggregators.java new file mode 100644 index 0000000..7c4f252 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsAggregators.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import static org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow; + +/** + * Standard aggregator names related to {@link GroupAlsoByWindow}. + */ +public abstract class GroupAlsoByWindowsAggregators { + public static final String DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER = "DroppedDueToClosedWindow"; + public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "DroppedDueToLateness"; +} http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java deleted file mode 100644 index 7e96136..0000000 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core; - -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Sum; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.SystemDoFnInternal; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.KV; - -/** - * {@link OldDoFn} that merges windows and groups elements in those windows, optionally - * combining values. - * - * @param <K> key type - * @param <InputT> input value element type - * @param <OutputT> output value element type - * @param <W> window type - */ -@SystemDoFnInternal -public abstract class GroupAlsoByWindowsDoFn<K, InputT, OutputT, W extends BoundedWindow> - extends OldDoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> { - public static final String DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER = "DroppedDueToClosedWindow"; - public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "DroppedDueToLateness"; - - protected final Aggregator<Long, Long> droppedDueToClosedWindow = - createAggregator(DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs()); - protected final Aggregator<Long, Long> droppedDueToLateness = - createAggregator(DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs()); -} http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java index 4d41527..cdc7ce7 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java @@ -22,6 +22,7 @@ import com.google.common.base.Function; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.WindowTracing; @@ -32,7 +33,7 @@ import org.joda.time.Instant; /** * A customized {@link DoFnRunner} that handles late data dropping for - * a {@link KeyedWorkItem} input {@link OldDoFn}. + * a {@link KeyedWorkItem} input {@link DoFn}. * * <p>It expands windows before checking data lateness. * http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFnTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFnTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFnTest.java index cb8d494..e725cd2 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFnTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFnTest.java @@ -43,10 +43,10 @@ public class GroupAlsoByWindowViaOutputBufferDoFnTest { @Override public <W extends BoundedWindow> - GroupAlsoByWindowsDoFn<K, InputT, Iterable<InputT>, W> forStrategy( + GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, Iterable<InputT>, W> forStrategy( WindowingStrategy<?, W> windowingStrategy, StateInternalsFactory<K> stateInternalsFactory) { - return new GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, Iterable<InputT>, W>( + return new GroupAlsoByWindowViaOutputBufferDoFn<>( windowingStrategy, stateInternalsFactory, SystemReduceFn.<K, InputT, W>buffering(inputCoder)); http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java index d0a8923..a5031b8 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java @@ -57,7 +57,7 @@ import org.joda.time.Duration; import org.joda.time.Instant; /** - * Properties of {@link GroupAlsoByWindowsDoFn}. + * Properties of {@link GroupAlsoByWindowsAggregators}. * * <p>Some properties may not hold of some implementations, due to restrictions on the context in * which the implementation is applicable. For example, some {@code GroupAlsoByWindows} may not @@ -66,12 +66,13 @@ import org.joda.time.Instant; public class GroupAlsoByWindowsProperties { /** - * A factory of {@link GroupAlsoByWindowsDoFn} so that the various properties can provide the - * appropriate windowing strategy under test. + * A factory of {@link GroupAlsoByWindowsAggregators} so that the various properties can provide + * the appropriate windowing strategy under test. */ public interface GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> { - <W extends BoundedWindow> GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> forStrategy( - WindowingStrategy<?, W> strategy, StateInternalsFactory<K> stateInternalsFactory); + <W extends BoundedWindow> + GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, OutputT, W> forStrategy( + WindowingStrategy<?, W> strategy, StateInternalsFactory<K> stateInternalsFactory); } /** @@ -311,7 +312,7 @@ public class GroupAlsoByWindowsProperties { } /** - * Tests that the given {@link GroupAlsoByWindowsDoFn} implementation combines elements per + * Tests that the given {@link GroupAlsoByWindowsAggregators} implementation combines elements per * session window correctly according to the provided {@link CombineFn}. */ public static void combinesElementsPerSession( @@ -498,7 +499,7 @@ public class GroupAlsoByWindowsProperties { } /** - * Tests that the given {@link GroupAlsoByWindowsDoFn} implementation combines elements per + * Tests that the given {@link GroupAlsoByWindowsAggregators} implementation combines elements per * session window correctly according to the provided {@link CombineFn}. */ public static void combinesElementsPerSessionWithEndOfWindowTimestamp( @@ -597,7 +598,7 @@ public class GroupAlsoByWindowsProperties { private static <K, InputT, OutputT, W extends BoundedWindow> List<WindowedValue<KV<K, OutputT>>> processElement( - GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> fn, + GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, OutputT, W> fn, KV<K, Iterable<WindowedValue<InputT>>> element) throws Exception { TestProcessContext<K, InputT, OutputT, W> c = new TestProcessContext<>(fn, element); @@ -621,18 +622,18 @@ public class GroupAlsoByWindowsProperties { } /** - * A {@link GroupAlsoByWindowsDoFn.ProcessContext} providing just enough context for a {@link - * GroupAlsoByWindowsDoFn} - namely, information about the element and output via {@link - * WindowingInternals}, but no side inputs/outputs and no normal output. + * A {@link GroupAlsoByWindowViaOutputBufferDoFn.ProcessContext} providing just enough context for + * a {@link GroupAlsoByWindowsAggregators} - namely, information about the element and output via + * {@link WindowingInternals}, but no side inputs/outputs and no normal output. */ private static class TestProcessContext<K, InputT, OutputT, W extends BoundedWindow> - extends GroupAlsoByWindowsDoFn<K, InputT, OutputT, W>.ProcessContext { + extends GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, OutputT, W>.ProcessContext { private final PipelineOptions options = PipelineOptionsFactory.create(); private final KV<K, Iterable<WindowedValue<InputT>>> element; private final List<WindowedValue<KV<K, OutputT>>> output = new ArrayList<>(); private TestProcessContext( - GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> fn, + GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, OutputT, W> fn, KV<K, Iterable<WindowedValue<InputT>>> element) { fn.super(); this.element = element; http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java index 914550e..923b2c3 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java @@ -113,7 +113,8 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { private boolean autoAdvanceOutputWatermark = true; private final InMemoryLongSumAggregator droppedDueToClosedWindow = - new InMemoryLongSumAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER); + new InMemoryLongSumAggregator( + GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER); /** * Creates a {@link ReduceFnTester} for the given {@link WindowingStrategy}, creating http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java index ce7b12a..ce29709 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java @@ -24,7 +24,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import java.util.ArrayList; import java.util.Collection; -import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn; +import org.apache.beam.runners.core.GroupAlsoByWindowsAggregators; import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly; import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow; import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly; @@ -146,10 +146,10 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory { application.getTransform().getValueCoder(inputBundle.getPCollection().getCoder()); reduceFn = SystemReduceFn.buffering(valueCoder); droppedDueToClosedWindow = aggregatorChanges.createSystemAggregator(stepContext, - GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, + GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs()); droppedDueToLateness = aggregatorChanges.createSystemAggregator(stepContext, - GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER, + GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs()); } http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java index 029c28a..1b40613 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java @@ -23,7 +23,8 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn; +import org.apache.beam.runners.core.GroupAlsoByWindowsAggregators; +import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow; import org.apache.beam.runners.core.LateDataUtils; import org.apache.beam.runners.core.OutputWindowedValue; import org.apache.beam.runners.core.ReduceFnRunner; @@ -76,7 +77,7 @@ import scala.reflect.ClassTag; import scala.runtime.AbstractFunction1; /** - * An implementation of {@link org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn} + * An implementation of {@link GroupAlsoByWindow} * logic for grouping by windows and controlling trigger firings and pane accumulation. * * <p>This implementation is a composite of Spark transformations revolving around state management @@ -208,9 +209,9 @@ public class SparkGroupAlsoByWindowViaWindowSet { // use in memory Aggregators since Spark Accumulators are not resilient // in stateful operators, once done with this partition. final InMemoryLongSumAggregator droppedDueToClosedWindow = new InMemoryLongSumAggregator( - GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER); + GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER); final InMemoryLongSumAggregator droppedDueToLateness = new InMemoryLongSumAggregator( - GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER); + GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_LATENESS_COUNTER); AbstractIterator< Tuple2</*K*/ ByteArray, Tuple2<StateAndTimers, /*WV<KV<K, Itr<I>>>*/ List<byte[]>>>> http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAssignWindowFn.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAssignWindowFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAssignWindowFn.java index 18a3dd8..088b981 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAssignWindowFn.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAssignWindowFn.java @@ -29,7 +29,8 @@ import org.joda.time.Instant; /** - * An implementation of {@link org.apache.beam.runners.core.AssignWindowsDoFn} for the Spark runner. + * An implementation of {@link org.apache.beam.sdk.transforms.windowing.Window.Assign} for the Spark + * runner. */ public class SparkAssignWindowFn<T, W extends BoundedWindow> implements Function<WindowedValue<T>, WindowedValue<T>> { http://git-wip-us.apache.org/repos/asf/beam/blob/a3b5f968/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java index ccc0fa3..85adca9 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java @@ -21,8 +21,8 @@ package org.apache.beam.runners.spark.translation; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import org.apache.beam.runners.core.GroupAlsoByWindowViaOutputBufferDoFn; -import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn; +import org.apache.beam.runners.core.GroupAlsoByWindowsAggregators; +import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow; import org.apache.beam.runners.core.InMemoryTimerInternals; import org.apache.beam.runners.core.OutputWindowedValue; import org.apache.beam.runners.core.ReduceFnRunner; @@ -48,7 +48,7 @@ import org.apache.spark.api.java.function.FlatMapFunction; import org.joda.time.Instant; /** - * An implementation of {@link GroupAlsoByWindowViaOutputBufferDoFn} + * An implementation of {@link GroupAlsoByWindow} * for the Spark runner. */ public class SparkGroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends BoundedWindow> @@ -75,7 +75,7 @@ public class SparkGroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends Bounde droppedDueToClosedWindow = runtimeContext.createAggregator( accumulator, - GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, + GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs()); }
