Repository: beam Updated Branches: refs/heads/master 420367088 -> 2f580caff
Moves PerKeyCombineFnRunners back to runners-core It is used by the Dataflow worker. However, this change moves only the non-OldDoFn related part to runners-core. The OldDoFn-related part stays in Flink. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1a3292fb Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1a3292fb Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1a3292fb Branch: refs/heads/master Commit: 1a3292fb4c57efdc6f98bf58cff9a0b42494574b Parents: 603f4fb Author: Eugene Kirpichov <kirpic...@google.com> Authored: Wed Jan 18 16:26:00 2017 -0800 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Thu Jan 19 11:43:25 2017 -0800 ---------------------------------------------------------------------- .../runners/core/PerKeyCombineFnRunner.java | 45 ---- .../runners/core/PerKeyCombineFnRunners.java | 161 +++++++++++++ .../runners/flink/OldPerKeyCombineFnRunner.java | 62 +++++ .../flink/OldPerKeyCombineFnRunners.java | 155 ++++++++++++ .../runners/flink/PerKeyCombineFnRunners.java | 239 ------------------- .../FlinkMergingNonShuffleReduceFunction.java | 8 +- .../FlinkMergingPartialReduceFunction.java | 8 +- .../functions/FlinkMergingReduceFunction.java | 8 +- .../functions/FlinkPartialReduceFunction.java | 8 +- .../functions/FlinkReduceFunction.java | 8 +- 10 files changed, 398 insertions(+), 304 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/1a3292fb/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java index 4550273..a6608a7 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java @@ -21,7 +21,6 @@ import java.io.Serializable; import java.util.Collection; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.SideInputReader; @@ -34,50 +33,6 @@ import org.apache.beam.sdk.util.SideInputReader; */ public interface PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> extends Serializable { /** - * Returns the {@link PerKeyCombineFn} it holds. - * - * <p>It can be a {@code KeyedCombineFn} or a {@code KeyedCombineFnWithContext}. - */ - PerKeyCombineFn<K, InputT, AccumT, OutputT> fn(); - - ///////////////////////////////////////////////////////////////////////////// - - /** - * Forwards the call to a {@link PerKeyCombineFn} to create the accumulator in a {@link OldDoFn}. - * - * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext} - * if it is required. - */ - AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c); - - /** - * Forwards the call to a {@link PerKeyCombineFn} to add the input in a {@link OldDoFn}. - * - * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext} - * if it is required. - */ - AccumT addInput(K key, AccumT accumulator, InputT input, OldDoFn<?, ?>.ProcessContext c); - - /** - * Forwards the call to a {@link PerKeyCombineFn} to merge accumulators in a {@link OldDoFn}. - * - * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext} - * if it is required. - */ - AccumT mergeAccumulators( - K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c); - - /** - * Forwards the call to a {@link PerKeyCombineFn} to extract the output in a {@link OldDoFn}. - * - * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext} - * if it is required. - */ - OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c); - - ///////////////////////////////////////////////////////////////////////////// - - /** * Forwards the call to a {@link PerKeyCombineFn} to create the accumulator. * * <p>It constructs a {@code CombineWithContext.Context} from http://git-wip-us.apache.org/repos/asf/beam/blob/1a3292fb/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java new file mode 100644 index 0000000..7736758 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import com.google.common.collect.Iterables; +import java.util.Collection; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; +import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn; +import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.CombineContextFactory; +import org.apache.beam.sdk.util.SideInputReader; + +/** + * Static utility methods that provide {@link PerKeyCombineFnRunner} implementations + * for different keyed combine functions. + */ +public class PerKeyCombineFnRunners { + /** + * Returns a {@link PerKeyCombineFnRunner} from a {@link PerKeyCombineFn}. + */ + public static <K, InputT, AccumT, OutputT> PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> + create(PerKeyCombineFn<K, InputT, AccumT, OutputT> perKeyCombineFn) { + if (perKeyCombineFn instanceof KeyedCombineFnWithContext) { + return new KeyedCombineFnWithContextRunner<>( + (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) perKeyCombineFn); + } else if (perKeyCombineFn instanceof KeyedCombineFn) { + return new KeyedCombineFnRunner<>( + (KeyedCombineFn<K, InputT, AccumT, OutputT>) perKeyCombineFn); + } else { + throw new IllegalStateException( + String.format("Unknown type of CombineFn: %s", perKeyCombineFn.getClass())); + } + } + + /** + * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFn}. + * + * <p>It forwards functions calls to the {@link KeyedCombineFn}. + */ + private static class KeyedCombineFnRunner<K, InputT, AccumT, OutputT> + implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> { + private final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn; + + private KeyedCombineFnRunner( + KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn) { + this.keyedCombineFn = keyedCombineFn; + } + + @Override + public String toString() { + return keyedCombineFn.toString(); + } + + @Override + public AccumT createAccumulator(K key, PipelineOptions options, + SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { + return keyedCombineFn.createAccumulator(key); + } + + @Override + public AccumT addInput(K key, AccumT accumulator, InputT input, PipelineOptions options, + SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { + return keyedCombineFn.addInput(key, accumulator, input); + } + + @Override + public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options, + SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { + return keyedCombineFn.mergeAccumulators(key, accumulators); + } + + @Override + public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options, + SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { + return keyedCombineFn.extractOutput(key, accumulator); + } + + @Override + public AccumT compact(K key, AccumT accumulator, PipelineOptions options, + SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { + return keyedCombineFn.compact(key, accumulator); + } + } + + /** + * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFnWithContext}. + * + * <p>It forwards functions calls to the {@link KeyedCombineFnWithContext}. + */ + private static class KeyedCombineFnWithContextRunner<K, InputT, AccumT, OutputT> + implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> { + private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext; + + private KeyedCombineFnWithContextRunner( + KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext) { + this.keyedCombineFnWithContext = keyedCombineFnWithContext; + } + + @Override + public String toString() { + return keyedCombineFnWithContext.toString(); + } + + @Override + public AccumT createAccumulator(K key, PipelineOptions options, SideInputReader sideInputReader, + Collection<? extends BoundedWindow> windows) { + return keyedCombineFnWithContext.createAccumulator(key, + CombineContextFactory.createFromComponents( + options, sideInputReader, Iterables.getOnlyElement(windows))); + } + + @Override + public AccumT addInput(K key, AccumT accumulator, InputT input, PipelineOptions options, + SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { + return keyedCombineFnWithContext.addInput(key, accumulator, input, + CombineContextFactory.createFromComponents( + options, sideInputReader, Iterables.getOnlyElement(windows))); + } + + @Override + public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options, + SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { + return keyedCombineFnWithContext.mergeAccumulators(key, accumulators, + CombineContextFactory.createFromComponents( + options, sideInputReader, Iterables.getOnlyElement(windows))); + } + + @Override + public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options, + SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { + return keyedCombineFnWithContext.extractOutput(key, accumulator, + CombineContextFactory.createFromComponents( + options, sideInputReader, Iterables.getOnlyElement(windows))); + } + + @Override + public AccumT compact(K key, AccumT accumulator, PipelineOptions options, + SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { + return keyedCombineFnWithContext.compact(key, accumulator, + CombineContextFactory.createFromComponents( + options, sideInputReader, Iterables.getOnlyElement(windows))); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/1a3292fb/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java new file mode 100644 index 0000000..5d676dc --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import java.io.Serializable; +import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn; +import org.apache.beam.sdk.transforms.OldDoFn; + +/** + * An interface that runs a {@link PerKeyCombineFn} with unified APIs using + * {@link OldDoFn.ProcessContext}. + */ +@Deprecated +public interface OldPerKeyCombineFnRunner<K, InputT, AccumT, OutputT> extends Serializable { + /** + * Forwards the call to a {@link PerKeyCombineFn} to create the accumulator in a {@link OldDoFn}. + * + * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext} + * if it is required. + */ + AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c); + + /** + * Forwards the call to a {@link PerKeyCombineFn} to add the input in a {@link OldDoFn}. + * + * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext} + * if it is required. + */ + AccumT addInput(K key, AccumT accumulator, InputT input, OldDoFn<?, ?>.ProcessContext c); + + /** + * Forwards the call to a {@link PerKeyCombineFn} to merge accumulators in a {@link OldDoFn}. + * + * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext} + * if it is required. + */ + AccumT mergeAccumulators( + K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c); + + /** + * Forwards the call to a {@link PerKeyCombineFn} to extract the output in a {@link OldDoFn}. + * + * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext} + * if it is required. + */ + OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c); +} http://git-wip-us.apache.org/repos/asf/beam/blob/1a3292fb/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java new file mode 100644 index 0000000..8ebeadf --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import org.apache.beam.runners.core.PerKeyCombineFnRunner; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; +import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn; +import org.apache.beam.sdk.transforms.CombineWithContext; +import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.values.PCollectionView; + +/** + * Static utility methods that provide {@link OldPerKeyCombineFnRunner} implementations + * for different keyed combine functions. + */ +@Deprecated +public class OldPerKeyCombineFnRunners { + /** + * Returns a {@link PerKeyCombineFnRunner} from a {@link PerKeyCombineFn}. + */ + public static <K, InputT, AccumT, OutputT> OldPerKeyCombineFnRunner<K, InputT, AccumT, OutputT> + create(PerKeyCombineFn<K, InputT, AccumT, OutputT> perKeyCombineFn) { + if (perKeyCombineFn instanceof KeyedCombineFnWithContext) { + return new KeyedCombineFnWithContextRunner<>( + (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) perKeyCombineFn); + } else if (perKeyCombineFn instanceof KeyedCombineFn) { + return new KeyedCombineFnRunner<>( + (KeyedCombineFn<K, InputT, AccumT, OutputT>) perKeyCombineFn); + } else { + throw new IllegalStateException( + String.format("Unknown type of CombineFn: %s", perKeyCombineFn.getClass())); + } + } + + /** Returns a {@code Combine.Context} that wraps a {@code OldDoFn.ProcessContext}. */ + private static CombineWithContext.Context createFromProcessContext( + final OldDoFn<?, ?>.ProcessContext c) { + return new CombineWithContext.Context() { + @Override + public PipelineOptions getPipelineOptions() { + return c.getPipelineOptions(); + } + + @Override + public <T> T sideInput(PCollectionView<T> view) { + return c.sideInput(view); + } + }; + } + + /** + * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFn}. + * + * <p>It forwards functions calls to the {@link KeyedCombineFn}. + */ + private static class KeyedCombineFnRunner<K, InputT, AccumT, OutputT> + implements OldPerKeyCombineFnRunner<K, InputT, AccumT, OutputT> { + private final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn; + + private KeyedCombineFnRunner( + KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn) { + this.keyedCombineFn = keyedCombineFn; + } + + @Override + public AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c) { + return keyedCombineFn.createAccumulator(key); + } + + @Override + public AccumT addInput( + K key, AccumT accumulator, InputT input, OldDoFn<?, ?>.ProcessContext c) { + return keyedCombineFn.addInput(key, accumulator, input); + } + + @Override + public AccumT mergeAccumulators( + K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c) { + return keyedCombineFn.mergeAccumulators(key, accumulators); + } + + @Override + public OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) { + return keyedCombineFn.extractOutput(key, accumulator); + } + + @Override + public String toString() { + return keyedCombineFn.toString(); + } + } + + /** + * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFnWithContext}. + * + * <p>It forwards functions calls to the {@link KeyedCombineFnWithContext}. + */ + private static class KeyedCombineFnWithContextRunner<K, InputT, AccumT, OutputT> + implements OldPerKeyCombineFnRunner<K, InputT, AccumT, OutputT> { + private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext; + + private KeyedCombineFnWithContextRunner( + KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext) { + this.keyedCombineFnWithContext = keyedCombineFnWithContext; + } + + @Override + public AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c) { + return keyedCombineFnWithContext.createAccumulator(key, + createFromProcessContext(c)); + } + + @Override + public AccumT addInput( + K key, AccumT accumulator, InputT value, OldDoFn<?, ?>.ProcessContext c) { + return keyedCombineFnWithContext.addInput(key, accumulator, value, + createFromProcessContext(c)); + } + + @Override + public AccumT mergeAccumulators( + K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c) { + return keyedCombineFnWithContext.mergeAccumulators( + key, accumulators, createFromProcessContext(c)); + } + + @Override + public OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) { + return keyedCombineFnWithContext.extractOutput(key, accumulator, + createFromProcessContext(c)); + } + + @Override + public String toString() { + return keyedCombineFnWithContext.toString(); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/1a3292fb/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PerKeyCombineFnRunners.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PerKeyCombineFnRunners.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PerKeyCombineFnRunners.java deleted file mode 100644 index f672578..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PerKeyCombineFnRunners.java +++ /dev/null @@ -1,239 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import com.google.common.collect.Iterables; -import java.util.Collection; -import org.apache.beam.runners.core.PerKeyCombineFnRunner; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; -import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn; -import org.apache.beam.sdk.transforms.CombineWithContext; -import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; -import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.CombineContextFactory; -import org.apache.beam.sdk.util.SideInputReader; -import org.apache.beam.sdk.values.PCollectionView; - -/** - * Static utility methods that provide {@link PerKeyCombineFnRunner} implementations - * for different keyed combine functions. - */ -public class PerKeyCombineFnRunners { - /** - * Returns a {@link PerKeyCombineFnRunner} from a {@link PerKeyCombineFn}. - */ - public static <K, InputT, AccumT, OutputT> PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> - create(PerKeyCombineFn<K, InputT, AccumT, OutputT> perKeyCombineFn) { - if (perKeyCombineFn instanceof KeyedCombineFnWithContext) { - return new KeyedCombineFnWithContextRunner<>( - (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) perKeyCombineFn); - } else if (perKeyCombineFn instanceof KeyedCombineFn) { - return new KeyedCombineFnRunner<>( - (KeyedCombineFn<K, InputT, AccumT, OutputT>) perKeyCombineFn); - } else { - throw new IllegalStateException( - String.format("Unknown type of CombineFn: %s", perKeyCombineFn.getClass())); - } - } - - /** Returns a {@code Combine.Context} that wraps a {@code OldDoFn.ProcessContext}. */ - private static CombineWithContext.Context createFromProcessContext( - final OldDoFn<?, ?>.ProcessContext c) { - return new CombineWithContext.Context() { - @Override - public PipelineOptions getPipelineOptions() { - return c.getPipelineOptions(); - } - - @Override - public <T> T sideInput(PCollectionView<T> view) { - return c.sideInput(view); - } - }; - } - - /** - * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFn}. - * - * <p>It forwards functions calls to the {@link KeyedCombineFn}. - */ - private static class KeyedCombineFnRunner<K, InputT, AccumT, OutputT> - implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> { - private final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn; - - private KeyedCombineFnRunner( - KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn) { - this.keyedCombineFn = keyedCombineFn; - } - - @Override - public KeyedCombineFn<K, InputT, AccumT, OutputT> fn() { - return keyedCombineFn; - } - - @Override - public AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c) { - return keyedCombineFn.createAccumulator(key); - } - - @Override - public AccumT addInput( - K key, AccumT accumulator, InputT input, OldDoFn<?, ?>.ProcessContext c) { - return keyedCombineFn.addInput(key, accumulator, input); - } - - @Override - public AccumT mergeAccumulators( - K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c) { - return keyedCombineFn.mergeAccumulators(key, accumulators); - } - - @Override - public OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) { - return keyedCombineFn.extractOutput(key, accumulator); - } - - @Override - public String toString() { - return keyedCombineFn.toString(); - } - - @Override - public AccumT createAccumulator(K key, PipelineOptions options, - SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { - return keyedCombineFn.createAccumulator(key); - } - - @Override - public AccumT addInput(K key, AccumT accumulator, InputT input, PipelineOptions options, - SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { - return keyedCombineFn.addInput(key, accumulator, input); - } - - @Override - public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options, - SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { - return keyedCombineFn.mergeAccumulators(key, accumulators); - } - - @Override - public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options, - SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { - return keyedCombineFn.extractOutput(key, accumulator); - } - - @Override - public AccumT compact(K key, AccumT accumulator, PipelineOptions options, - SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { - return keyedCombineFn.compact(key, accumulator); - } - } - - /** - * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFnWithContext}. - * - * <p>It forwards functions calls to the {@link KeyedCombineFnWithContext}. - */ - private static class KeyedCombineFnWithContextRunner<K, InputT, AccumT, OutputT> - implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> { - private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext; - - private KeyedCombineFnWithContextRunner( - KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext) { - this.keyedCombineFnWithContext = keyedCombineFnWithContext; - } - - @Override - public KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> fn() { - return keyedCombineFnWithContext; - } - - @Override - public AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c) { - return keyedCombineFnWithContext.createAccumulator(key, - createFromProcessContext(c)); - } - - @Override - public AccumT addInput( - K key, AccumT accumulator, InputT value, OldDoFn<?, ?>.ProcessContext c) { - return keyedCombineFnWithContext.addInput(key, accumulator, value, - createFromProcessContext(c)); - } - - @Override - public AccumT mergeAccumulators( - K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c) { - return keyedCombineFnWithContext.mergeAccumulators( - key, accumulators, createFromProcessContext(c)); - } - - @Override - public OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) { - return keyedCombineFnWithContext.extractOutput(key, accumulator, - createFromProcessContext(c)); - } - - @Override - public String toString() { - return keyedCombineFnWithContext.toString(); - } - - @Override - public AccumT createAccumulator(K key, PipelineOptions options, SideInputReader sideInputReader, - Collection<? extends BoundedWindow> windows) { - return keyedCombineFnWithContext.createAccumulator(key, - CombineContextFactory.createFromComponents( - options, sideInputReader, Iterables.getOnlyElement(windows))); - } - - @Override - public AccumT addInput(K key, AccumT accumulator, InputT input, PipelineOptions options, - SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { - return keyedCombineFnWithContext.addInput(key, accumulator, input, - CombineContextFactory.createFromComponents( - options, sideInputReader, Iterables.getOnlyElement(windows))); - } - - @Override - public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options, - SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { - return keyedCombineFnWithContext.mergeAccumulators(key, accumulators, - CombineContextFactory.createFromComponents( - options, sideInputReader, Iterables.getOnlyElement(windows))); - } - - @Override - public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options, - SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { - return keyedCombineFnWithContext.extractOutput(key, accumulator, - CombineContextFactory.createFromComponents( - options, sideInputReader, Iterables.getOnlyElement(windows))); - } - - @Override - public AccumT compact(K key, AccumT accumulator, PipelineOptions options, - SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { - return keyedCombineFnWithContext.compact(key, accumulator, - CombineContextFactory.createFromComponents( - options, sideInputReader, Iterables.getOnlyElement(windows))); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/1a3292fb/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java index 6412e63..1b43172 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java @@ -24,8 +24,8 @@ import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; -import org.apache.beam.runners.core.PerKeyCombineFnRunner; -import org.apache.beam.runners.flink.PerKeyCombineFnRunners; +import org.apache.beam.runners.flink.OldPerKeyCombineFnRunner; +import org.apache.beam.runners.flink.OldPerKeyCombineFnRunners; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineFnBase; @@ -101,8 +101,8 @@ public class FlinkMergingNonShuffleReduceFunction< sideInputs, out ); - PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> combineFnRunner = - PerKeyCombineFnRunners.create(combineFn); + OldPerKeyCombineFnRunner<K, InputT, AccumT, OutputT> combineFnRunner = + OldPerKeyCombineFnRunners.create(combineFn); @SuppressWarnings("unchecked") OutputTimeFn<? super BoundedWindow> outputTimeFn = http://git-wip-us.apache.org/repos/asf/beam/blob/1a3292fb/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java index 1456eea..cf058e8 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java @@ -24,8 +24,8 @@ import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; -import org.apache.beam.runners.core.PerKeyCombineFnRunner; -import org.apache.beam.runners.flink.PerKeyCombineFnRunners; +import org.apache.beam.runners.flink.OldPerKeyCombineFnRunner; +import org.apache.beam.runners.flink.OldPerKeyCombineFnRunners; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineFnBase; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -69,8 +69,8 @@ public class FlinkMergingPartialReduceFunction<K, InputT, AccumT, W extends Inte sideInputs, out ); - PerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner = - PerKeyCombineFnRunners.create(combineFn); + OldPerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner = + OldPerKeyCombineFnRunners.create(combineFn); @SuppressWarnings("unchecked") OutputTimeFn<? super BoundedWindow> outputTimeFn = http://git-wip-us.apache.org/repos/asf/beam/blob/1a3292fb/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java index 2f56fac..4fa4578 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java @@ -26,8 +26,8 @@ import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; -import org.apache.beam.runners.core.PerKeyCombineFnRunner; -import org.apache.beam.runners.flink.PerKeyCombineFnRunners; +import org.apache.beam.runners.flink.OldPerKeyCombineFnRunner; +import org.apache.beam.runners.flink.OldPerKeyCombineFnRunners; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineFnBase; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -71,8 +71,8 @@ public class FlinkMergingReduceFunction<K, AccumT, OutputT, W extends IntervalWi sideInputs, out ); - PerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner = - PerKeyCombineFnRunners.create(combineFn); + OldPerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner = + OldPerKeyCombineFnRunners.create(combineFn); @SuppressWarnings("unchecked") OutputTimeFn<? super BoundedWindow> outputTimeFn = http://git-wip-us.apache.org/repos/asf/beam/blob/1a3292fb/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java index 627cfa6..f5a9087 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java @@ -24,8 +24,8 @@ import java.util.Collections; import java.util.Comparator; import java.util.Iterator; import java.util.Map; -import org.apache.beam.runners.core.PerKeyCombineFnRunner; -import org.apache.beam.runners.flink.PerKeyCombineFnRunners; +import org.apache.beam.runners.flink.OldPerKeyCombineFnRunner; +import org.apache.beam.runners.flink.OldPerKeyCombineFnRunners; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineFnBase; @@ -97,8 +97,8 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind sideInputs, out ); - PerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner = - PerKeyCombineFnRunners.create(combineFn); + OldPerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner = + OldPerKeyCombineFnRunners.create(combineFn); @SuppressWarnings("unchecked") OutputTimeFn<? super BoundedWindow> outputTimeFn = http://git-wip-us.apache.org/repos/asf/beam/blob/1a3292fb/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java index de0d416..a3fa0d4 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java @@ -26,8 +26,8 @@ import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; -import org.apache.beam.runners.core.PerKeyCombineFnRunner; -import org.apache.beam.runners.flink.PerKeyCombineFnRunners; +import org.apache.beam.runners.flink.OldPerKeyCombineFnRunner; +import org.apache.beam.runners.flink.OldPerKeyCombineFnRunners; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineFnBase; @@ -101,8 +101,8 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow> sideInputs, out ); - PerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner = - PerKeyCombineFnRunners.create(combineFn); + OldPerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner = + OldPerKeyCombineFnRunners.create(combineFn); @SuppressWarnings("unchecked") OutputTimeFn<? super BoundedWindow> outputTimeFn =