jstorm-runner: remove AdaptorBasicBolt and AdaptorBasicSpout.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9309ac49 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9309ac49 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9309ac49 Branch: refs/heads/jstorm-runner Commit: 9309ac49d81e1d6dfd694ec885cdb12a3db53483 Parents: 5a15d54 Author: Pei He <[email protected]> Authored: Fri Jul 14 14:50:47 2017 +0800 Committer: Pei He <[email protected]> Committed: Sat Aug 19 12:02:57 2017 +0800 ---------------------------------------------------------------------- .../beam/runners/jstorm/JStormRunner.java | 16 +++--------- .../jstorm/translation/TranslationContext.java | 12 ++++----- .../translation/runtime/AdaptorBasicBolt.java | 27 -------------------- .../translation/runtime/AdaptorBasicSpout.java | 27 -------------------- .../translation/runtime/ExecutorsBolt.java | 3 ++- .../runtime/UnboundedSourceSpout.java | 3 ++- 6 files changed, 14 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/9309ac49/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java index 00ec7f6..8782130 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java @@ -41,8 +41,6 @@ import org.apache.beam.runners.jstorm.serialization.UnmodifiableCollectionsSeria import org.apache.beam.runners.jstorm.translation.JStormPipelineTranslator; import org.apache.beam.runners.jstorm.translation.TranslationContext; import org.apache.beam.runners.jstorm.translation.runtime.AbstractComponent; -import org.apache.beam.runners.jstorm.translation.runtime.AdaptorBasicBolt; -import org.apache.beam.runners.jstorm.translation.runtime.AdaptorBasicSpout; import org.apache.beam.runners.jstorm.translation.runtime.ExecutorsBolt; import org.apache.beam.runners.jstorm.translation.runtime.TxExecutorsBolt; import org.apache.beam.runners.jstorm.translation.runtime.TxUnboundedSourceSpout; @@ -155,18 +153,12 @@ public class JStormRunner extends PipelineRunner<JStormRunnerResult> { private AbstractComponent getComponent( String id, TranslationContext.ExecutionGraphContext context) { - AbstractComponent component = null; - AdaptorBasicSpout spout = context.getSpout(id); + AbstractComponent spout = context.getSpout(id); if (spout != null) { - component = spout; + return spout; } else { - AdaptorBasicBolt bolt = context.getBolt(id); - if (bolt != null) { - component = bolt; - } + return context.getBolt(id); } - - return component; } private StormTopology getTopology( @@ -176,7 +168,7 @@ public class JStormRunner extends PipelineRunner<JStormRunnerResult> { isExactlyOnce ? new TransactionTopologyBuilder() : new TopologyBuilder(); int parallelismNumber = options.getParallelismNumber(); - Map<String, AdaptorBasicSpout> spouts = context.getSpouts(); + Map<String, UnboundedSourceSpout> spouts = context.getSpouts(); for (String id : spouts.keySet()) { IRichSpout spout = getSpout(isExactlyOnce, spouts.get(id)); builder.setSpout(id, spout, getParallelismNum(spouts.get(id), parallelismNumber)); http://git-wip-us.apache.org/repos/asf/beam/blob/9309ac49/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java index 1230a31..28d102d 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java @@ -34,9 +34,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.beam.runners.jstorm.JStormPipelineOptions; -import org.apache.beam.runners.jstorm.translation.runtime.AdaptorBasicSpout; import org.apache.beam.runners.jstorm.translation.runtime.Executor; import org.apache.beam.runners.jstorm.translation.runtime.ExecutorsBolt; +import org.apache.beam.runners.jstorm.translation.runtime.UnboundedSourceSpout; import org.apache.beam.runners.jstorm.translation.translator.Stream; import org.apache.beam.runners.jstorm.translation.util.CommonInstance; import org.apache.beam.runners.jstorm.util.RunnerUtils; @@ -333,7 +333,7 @@ public class TranslationContext { */ public static class ExecutionGraphContext { - private final Map<String, AdaptorBasicSpout> spoutMap = new HashMap<>(); + private final Map<String, UnboundedSourceSpout> spoutMap = new HashMap<>(); private final Map<String, ExecutorsBolt> boltMap = new HashMap<>(); // One-to-one mapping between Stream.Producer and TaggedPValue (or PValue). @@ -344,7 +344,7 @@ public class TranslationContext { private int id = 1; - public void registerSpout(AdaptorBasicSpout spout, TaggedPValue output) { + public void registerSpout(UnboundedSourceSpout spout, TaggedPValue output) { checkNotNull(spout, "spout"); checkNotNull(output, "output"); String name = "spout" + genId(); @@ -354,14 +354,14 @@ public class TranslationContext { Stream.Producer.of(name, output.getTag().getId(), output.getValue().getName())); } - public AdaptorBasicSpout getSpout(String id) { + public UnboundedSourceSpout getSpout(String id) { if (Strings.isNullOrEmpty(id)) { return null; } return this.spoutMap.get(id); } - public Map<String, AdaptorBasicSpout> getSpouts() { + public Map<String, UnboundedSourceSpout> getSpouts() { return this.spoutMap; } @@ -418,7 +418,7 @@ public class TranslationContext { public String toString() { List<String> ret = new ArrayList<>(); ret.add("SPOUT"); - for (Map.Entry<String, AdaptorBasicSpout> entry : spoutMap.entrySet()) { + for (Map.Entry<String, UnboundedSourceSpout> entry : spoutMap.entrySet()) { ret.add(entry.getKey() + ": " + entry.getValue().toString()); } ret.add("BOLT"); http://git-wip-us.apache.org/repos/asf/beam/blob/9309ac49/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java deleted file mode 100644 index d8d4d46..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.runtime; - -import backtype.storm.topology.IRichBatchBolt; - -/** - * Adaptor bolt of JStorm extends {@link AbstractComponent}. - */ -public abstract class AdaptorBasicBolt extends AbstractComponent implements IRichBatchBolt { - -} http://git-wip-us.apache.org/repos/asf/beam/blob/9309ac49/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java deleted file mode 100644 index 814d416..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.runtime; - -import backtype.storm.topology.IRichSpout; - -/** - * Adaptor bolt of JStorm extends {@link AbstractComponent}. - */ -public abstract class AdaptorBasicSpout extends AbstractComponent implements IRichSpout { - -} http://git-wip-us.apache.org/repos/asf/beam/blob/9309ac49/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java index d33c17a..0366c13 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; +import backtype.storm.topology.IRichBatchBolt; import backtype.storm.tuple.ITupleExt; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; @@ -52,7 +53,7 @@ import org.slf4j.LoggerFactory; /** * ExecutorsBolt is a JStorm Bolt composited with several executors chained in a sub-DAG. */ -public class ExecutorsBolt extends AdaptorBasicBolt { +public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt { private static final long serialVersionUID = -7751043327801735211L; private static final Logger LOG = LoggerFactory.getLogger(ExecutorsBolt.class); http://git-wip-us.apache.org/repos/asf/beam/blob/9309ac49/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java index 006cd47..690824d 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; +import backtype.storm.topology.IRichSpout; import backtype.storm.tuple.Values; import com.alibaba.jstorm.utils.KryoSerializer; import java.io.IOException; @@ -43,7 +44,7 @@ import org.slf4j.LoggerFactory; * Spout implementation that wraps a Beam UnboundedSource. * TODO: add wrapper to support metrics in UnboundedSource. */ -public class UnboundedSourceSpout extends AdaptorBasicSpout { +public class UnboundedSourceSpout extends AbstractComponent implements IRichSpout { private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceSpout.class); private final String description;
