jstorm-runner: remove top level classes RunnerUtils and SingletonKeyedWorkItem.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/74ceac61 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/74ceac61 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/74ceac61 Branch: refs/heads/jstorm-runner Commit: 74ceac6173f78c76247b9ea4cb8179ca1ed9f62d Parents: 8265353 Author: Pei He <[email protected]> Authored: Fri Jul 14 15:40:23 2017 +0800 Committer: Pei He <[email protected]> Committed: Sat Aug 19 12:02:57 2017 +0800 ---------------------------------------------------------------------- .../translation/GroupByWindowExecutor.java | 45 +++++++++++++- .../runners/jstorm/translation/RunnerUtils.java | 51 ---------------- .../translation/SingletonKeyedWorkItem.java | 62 -------------------- .../jstorm/translation/TranslationContext.java | 13 +++- 4 files changed, 56 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/74ceac61/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByWindowExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByWindowExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByWindowExecutor.java index bf6e1ad..1c858b7 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByWindowExecutor.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByWindowExecutor.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument; import com.google.common.collect.ImmutableList; import java.io.Serializable; +import java.util.Collections; import java.util.List; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; @@ -152,7 +153,7 @@ class GroupByWindowExecutor<K, V> * For GroupByKey, KV type elem is received. We need to convert the KV elem * into KeyedWorkItem first, which is the expected type in LateDataDroppingDoFnRunner. */ - KeyedWorkItem<K, V> keyedWorkItem = RunnerUtils.toKeyedWorkItem((WindowedValue<KV<K, V>>) elem); + KeyedWorkItem<K, V> keyedWorkItem = toKeyedWorkItem((WindowedValue<KV<K, V>>) elem); runner.processElement(elem.withValue(keyedWorkItem)); } @@ -170,4 +171,46 @@ class GroupByWindowExecutor<K, V> public String toString() { return super.toString(); } + + private <K, V> KeyedWorkItem<K, V> toKeyedWorkItem(WindowedValue<KV<K, V>> kvElem) { + SingletonKeyedWorkItem<K, V> workItem = SingletonKeyedWorkItem.of( + kvElem.getValue().getKey(), + kvElem.withValue(kvElem.getValue().getValue())); + return workItem; + } + + private static class SingletonKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K, ElemT> { + + final K key; + final WindowedValue<ElemT> value; + + private SingletonKeyedWorkItem(K key, WindowedValue<ElemT> value) { + this.key = key; + this.value = value; + } + + public static <K, ElemT> SingletonKeyedWorkItem<K, ElemT> of( + K key, WindowedValue<ElemT> value) { + return new SingletonKeyedWorkItem<>(key, value); + } + + @Override + public K key() { + return key; + } + + public WindowedValue<ElemT> value() { + return value; + } + + @Override + public Iterable<TimerInternals.TimerData> timersIterable() { + return Collections.EMPTY_LIST; + } + + @Override + public Iterable<WindowedValue<ElemT>> elementsIterable() { + return Collections.singletonList(value); + } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/74ceac61/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/RunnerUtils.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/RunnerUtils.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/RunnerUtils.java deleted file mode 100644 index 4f469f3..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/RunnerUtils.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation; - -import org.apache.beam.runners.core.KeyedWorkItem; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.KV; - -/** - * Utils for JStorm runner. - */ -class RunnerUtils { - /** - * Convert {@link WindowedValue} into {@link KeyedWorkItem}. - * @param elem - * @return - */ - public static <K, V> KeyedWorkItem<K, V> toKeyedWorkItem(WindowedValue<KV<K, V>> elem) { - WindowedValue<KV<K, V>> kvElem = (WindowedValue<KV<K, V>>) elem; - SingletonKeyedWorkItem<K, V> workItem = SingletonKeyedWorkItem.of( - kvElem.getValue().getKey(), - kvElem.withValue(kvElem.getValue().getValue())); - return workItem; - } - - public static boolean isGroupByKeyExecutor(Executor executor) { - if (executor instanceof GroupByWindowExecutor) { - return true; - } else if (executor instanceof StatefulDoFnExecutor - || executor instanceof MultiStatefulDoFnExecutor) { - return true; - } else { - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/74ceac61/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/SingletonKeyedWorkItem.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/SingletonKeyedWorkItem.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/SingletonKeyedWorkItem.java deleted file mode 100644 index b321c76..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/SingletonKeyedWorkItem.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation; - -import java.util.Collections; -import org.apache.beam.runners.core.KeyedWorkItem; -import org.apache.beam.runners.core.TimerInternals; -import org.apache.beam.sdk.util.WindowedValue; - -/** - * Singleton keyed word item. - * @param <K> - * @param <ElemT> - */ -class SingletonKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K, ElemT> { - - final K key; - final WindowedValue<ElemT> value; - - private SingletonKeyedWorkItem(K key, WindowedValue<ElemT> value) { - this.key = key; - this.value = value; - } - - public static <K, ElemT> SingletonKeyedWorkItem<K, ElemT> of(K key, WindowedValue<ElemT> value) { - return new SingletonKeyedWorkItem<K, ElemT>(key, value); - } - - @Override - public K key() { - return key; - } - - public WindowedValue<ElemT> value() { - return value; - } - - @Override - public Iterable<TimerInternals.TimerData> timersIterable() { - return Collections.EMPTY_LIST; - } - - @Override - public Iterable<WindowedValue<ElemT>> elementsIterable() { - return Collections.singletonList(value); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/74ceac61/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java index b84fd4a..e25f211 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java @@ -158,7 +158,7 @@ public class TranslationContext { * d) For the purpose of performance to reduce the side effects between multiple streams which * is output to same executor, a new bolt will be created. */ - if (RunnerUtils.isGroupByKeyExecutor(executor)) { + if (isGroupByKeyExecutor(executor)) { bolt = new ExecutorsBolt(); name = executionGraphContext.registerBolt(bolt); isGBK = true; @@ -435,4 +435,15 @@ public class TranslationContext { return id++; } } + + private boolean isGroupByKeyExecutor(Executor executor) { + if (executor instanceof GroupByWindowExecutor) { + return true; + } else if (executor instanceof StatefulDoFnExecutor + || executor instanceof MultiStatefulDoFnExecutor) { + return true; + } else { + return false; + } + } }
