Refactor to move the isTick to a utility class
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/59fb8ded Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/59fb8ded Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/59fb8ded Branch: refs/heads/master Commit: 59fb8ded9ef892792879efc94b12ea46ff924c41 Parents: 4d2804a Author: Niels Basjes <[email protected]> Authored: Mon Dec 15 10:56:05 2014 +0100 Committer: Niels Basjes <[email protected]> Committed: Mon Dec 15 10:56:05 2014 +0100 ---------------------------------------------------------------------- .../storm/starter/bolt/AbstractRankerBolt.java | 3 +- .../storm/starter/bolt/RollingCountBolt.java | 3 +- .../storm/starter/tools/MockTupleHelpers.java | 6 ---- .../src/jvm/storm/kafka/bolt/KafkaBolt.java | 4 +-- .../src/jvm/backtype/storm/tuple/Tuple.java | 5 --- .../src/jvm/backtype/storm/tuple/TupleImpl.java | 5 --- .../jvm/backtype/storm/utils/TupleUtils.java | 35 ++++++++++++++++++++ .../trident/topology/TridentBoltExecutor.java | 3 +- 8 files changed, 43 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/59fb8ded/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java index 83c2cfc..64ceb29 100644 --- a/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java +++ b/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java @@ -24,6 +24,7 @@ import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; +import backtype.storm.utils.TupleUtils; import org.apache.log4j.Logger; import storm.starter.tools.Rankings; @@ -77,7 +78,7 @@ public abstract class AbstractRankerBolt extends BaseBasicBolt { */ @Override public final void execute(Tuple tuple, BasicOutputCollector collector) { - if (tuple.isTick()) { + if (TupleUtils.isTick(tuple)) { getLogger().debug("Received tick tuple, triggering emit of current rankings"); emitRankings(collector); } http://git-wip-us.apache.org/repos/asf/storm/blob/59fb8ded/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java index f023c0b..31f7ee2 100644 --- a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java +++ b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java @@ -25,6 +25,7 @@ import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; +import backtype.storm.utils.TupleUtils; import org.apache.log4j.Logger; import storm.starter.tools.NthLastModifiedTimeTracker; import storm.starter.tools.SlidingWindowCounter; @@ -94,7 +95,7 @@ public class RollingCountBolt extends BaseRichBolt { @Override public void execute(Tuple tuple) { - if (tuple.isTick()) { + if (TupleUtils.isTick(tuple)) { LOG.debug("Received tick tuple, triggering emit of current window counts"); emitCurrentWindowCounts(); } http://git-wip-us.apache.org/repos/asf/storm/blob/59fb8ded/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java index 9e8629c..eeaeeae 100644 --- a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java +++ b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java @@ -35,13 +35,7 @@ public final class MockTupleHelpers { Tuple tuple = mock(Tuple.class); when(tuple.getSourceComponent()).thenReturn(componentId); when(tuple.getSourceStreamId()).thenReturn(streamId); - when(tuple.isTick()).thenReturn(isTick(componentId, streamId)); return tuple; } - private static boolean isTick(String componentId, String streamId) { - return componentId.equals(Constants.SYSTEM_COMPONENT_ID) && - streamId.equals(Constants.SYSTEM_TICK_STREAM_ID); - } - } http://git-wip-us.apache.org/repos/asf/storm/blob/59fb8ded/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java index 7de25db..35c0da6 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java +++ b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java @@ -22,6 +22,7 @@ import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; +import backtype.storm.utils.TupleUtils; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; @@ -89,8 +90,7 @@ public class KafkaBolt<K, V> extends BaseRichBolt { @Override public void execute(Tuple input) { - if (input.isTick()) { - collector.ack(input); + if (TupleUtils.isTick(input)) { return; // Do not try to send ticks to Kafka } http://git-wip-us.apache.org/repos/asf/storm/blob/59fb8ded/storm-core/src/jvm/backtype/storm/tuple/Tuple.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/tuple/Tuple.java b/storm-core/src/jvm/backtype/storm/tuple/Tuple.java index b3f5e56..c644fec 100644 --- a/storm-core/src/jvm/backtype/storm/tuple/Tuple.java +++ b/storm-core/src/jvm/backtype/storm/tuple/Tuple.java @@ -165,11 +165,6 @@ public interface Tuple { public String getSourceStreamId(); /** - * Returns if this tuple is a tick tuple or not. - */ - public boolean isTick(); - - /** * Gets the message id that associated with this tuple. */ public MessageId getMessageId(); http://git-wip-us.apache.org/repos/asf/storm/blob/59fb8ded/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java index 40ad11c..7829327 100644 --- a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java +++ b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java @@ -214,11 +214,6 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed, return streamId; } - public boolean isTick() { - return Constants.SYSTEM_COMPONENT_ID.equals(this.getSourceComponent()) && - Constants.SYSTEM_TICK_STREAM_ID.equals(this.getSourceStreamId()); - } - public MessageId getMessageId() { return id; } http://git-wip-us.apache.org/repos/asf/storm/blob/59fb8ded/storm-core/src/jvm/backtype/storm/utils/TupleUtils.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/utils/TupleUtils.java b/storm-core/src/jvm/backtype/storm/utils/TupleUtils.java new file mode 100644 index 0000000..f9fb2c0 --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/utils/TupleUtils.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.utils; + +import backtype.storm.Constants; +import backtype.storm.tuple.Tuple; + +public final class TupleUtils { + + private TupleUtils() { + // No instantiation + } + + public static boolean isTick(Tuple tuple) { + return tuple != null + && Constants.SYSTEM_COMPONENT_ID .equals(tuple.getSourceComponent()) + && Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId()); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/59fb8ded/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java index da4c1a5..41741a1 100644 --- a/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java +++ b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java @@ -34,6 +34,7 @@ import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import backtype.storm.utils.RotatingMap; +import backtype.storm.utils.TupleUtils; import backtype.storm.utils.Utils; import java.io.Serializable; import java.util.Arrays; @@ -299,7 +300,7 @@ public class TridentBoltExecutor implements IRichBolt { @Override public void execute(Tuple tuple) { - if(tuple.isTick()) { + if (TupleUtils.isTick(tuple)) { long now = System.currentTimeMillis(); if(now - _lastRotate > _messageTimeoutMs) { _batches.rotate();
