Repository: storm Updated Branches: refs/heads/master a7cdfefd7 -> b5f02d4e4
http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/topology/StatefulWindowedBoltExecutor.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/topology/StatefulWindowedBoltExecutor.java b/storm-core/src/jvm/org/apache/storm/topology/StatefulWindowedBoltExecutor.java index aff46e6..2fc6f8d 100644 --- a/storm-core/src/jvm/org/apache/storm/topology/StatefulWindowedBoltExecutor.java +++ b/storm-core/src/jvm/org/apache/storm/topology/StatefulWindowedBoltExecutor.java @@ -207,14 +207,14 @@ public class StatefulWindowedBoltExecutor<T extends State> extends WindowedBoltE } @Override - public void onActivation(List<Tuple> events, List<Tuple> newEvents, List<Tuple> expired) { + public void onActivation(List<Tuple> events, List<Tuple> newEvents, List<Tuple> expired, Long timestamp) { if (isRecovering()) { String msg = String.format("Unexpected activation with events %s, newEvents %s, expired %s in recovering state. " + "recoveryStates %s ", events, newEvents, expired, recoveryStates); LOG.error(msg); throw new IllegalStateException(msg); } else { - parentListener.onActivation(events, newEvents, expired); + parentListener.onActivation(events, newEvents, expired, timestamp); updateWindowState(expired, newEvents); } } http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java b/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java index fd98274..b592e0b 100644 --- a/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java +++ b/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java @@ -327,9 +327,9 @@ public class WindowedBoltExecutor implements IRichBolt { } @Override - public void onActivation(List<Tuple> tuples, List<Tuple> newTuples, List<Tuple> expiredTuples) { + public void onActivation(List<Tuple> tuples, List<Tuple> newTuples, List<Tuple> expiredTuples, Long timestamp) { windowedOutputCollector.setContext(tuples); - bolt.execute(new TupleWindowImpl(tuples, newTuples, expiredTuples)); + bolt.execute(new TupleWindowImpl(tuples, newTuples, expiredTuples, timestamp)); } }; } http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java b/storm-core/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java index 2e2af0d..ba97c26 100644 --- a/storm-core/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java +++ b/storm-core/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java @@ -27,6 +27,7 @@ import org.apache.storm.windowing.TimestampExtractor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Serializable; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -40,7 +41,7 @@ public abstract class BaseWindowedBolt implements IWindowedBolt { /** * Holds a count value for count based windows and sliding intervals. */ - public static class Count { + public static class Count implements Serializable { public final int value; public Count(int value) { @@ -58,6 +59,22 @@ public abstract class BaseWindowedBolt implements IWindowedBolt { } @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + Count count = (Count) o; + + return value == count.value; + + } + + @Override + public int hashCode() { + return value; + } + + @Override public String toString() { return "Count{" + "value=" + value + @@ -68,7 +85,7 @@ public abstract class BaseWindowedBolt implements IWindowedBolt { /** * Holds a Time duration for time based windows and sliding intervals. */ - public static class Duration { + public static class Duration implements Serializable { public final int value; public Duration(int value, TimeUnit timeUnit) { @@ -126,6 +143,22 @@ public abstract class BaseWindowedBolt implements IWindowedBolt { } @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + Duration duration = (Duration) o; + + return value == duration.value; + + } + + @Override + public int hashCode() { + return value; + } + + @Override public String toString() { return "Duration{" + "value=" + value + http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java index f93527a..a8fbb41 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java @@ -121,7 +121,7 @@ public abstract class AbstractTridentWindowManager<T> implements ITridentWindowM } @Override - public void onActivation(List<T> events, List<T> newEvents, List<T> expired) { + public void onActivation(List<T> events, List<T> newEvents, List<T> expired, Long timestamp) { LOG.debug("onActivation is invoked with events size: [{}]", events.size()); // trigger occurred, create an aggregation and keep them in store int currentTriggerId = triggerId.incrementAndGet(); http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/windowing/CountEvictionPolicy.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/windowing/CountEvictionPolicy.java b/storm-core/src/jvm/org/apache/storm/windowing/CountEvictionPolicy.java index fb12202..6a9a4f8 100644 --- a/storm-core/src/jvm/org/apache/storm/windowing/CountEvictionPolicy.java +++ b/storm-core/src/jvm/org/apache/storm/windowing/CountEvictionPolicy.java @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicLong; public class CountEvictionPolicy<T> implements EvictionPolicy<T> { protected final int threshold; protected final AtomicLong currentCount; + private EvictionContext context; public CountEvictionPolicy(int count) { this.threshold = count; @@ -62,7 +63,12 @@ public class CountEvictionPolicy<T> implements EvictionPolicy<T> { @Override public void setContext(EvictionContext context) { - // NOOP + this.context = context; + } + + @Override + public EvictionContext getContext() { + return context; } @Override http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/windowing/EvictionPolicy.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/windowing/EvictionPolicy.java b/storm-core/src/jvm/org/apache/storm/windowing/EvictionPolicy.java index 05e4d93..774d0a3 100644 --- a/storm-core/src/jvm/org/apache/storm/windowing/EvictionPolicy.java +++ b/storm-core/src/jvm/org/apache/storm/windowing/EvictionPolicy.java @@ -72,4 +72,11 @@ public interface EvictionPolicy<T> { */ void setContext(EvictionContext context); + /** + * Returns the current context that is part of this eviction policy + * + * @return the eviction context + */ + EvictionContext getContext(); + } http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java b/storm-core/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java index e646207..802e6bb 100644 --- a/storm-core/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java +++ b/storm-core/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java @@ -22,11 +22,7 @@ package org.apache.storm.windowing; */ public class TimeEvictionPolicy<T> implements EvictionPolicy<T> { private final int windowLength; - /** - * The reference time in millis for window calculations and - * expiring events. If not set it will default to System.currentTimeMillis() - */ - protected Long referenceTime; + protected EvictionContext evictionContext; /** * Constructs a TimeEvictionPolicy that evicts events older @@ -43,7 +39,7 @@ public class TimeEvictionPolicy<T> implements EvictionPolicy<T> { */ @Override public Action evict(Event<T> event) { - long now = referenceTime == null ? System.currentTimeMillis() : referenceTime; + long now = evictionContext == null ? System.currentTimeMillis() : evictionContext.getReferenceTime(); long diff = now - event.getTimestamp(); if (diff >= windowLength) { return Action.EXPIRE; @@ -58,14 +54,19 @@ public class TimeEvictionPolicy<T> implements EvictionPolicy<T> { @Override public void setContext(EvictionContext context) { - referenceTime = context.getReferenceTime(); + this.evictionContext = context; + } + + @Override + public EvictionContext getContext() { + return evictionContext; } @Override public String toString() { return "TimeEvictionPolicy{" + "windowLength=" + windowLength + - ", referenceTime=" + referenceTime + + ", evictionContext=" + evictionContext + '}'; } } http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/windowing/TupleWindowImpl.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/windowing/TupleWindowImpl.java b/storm-core/src/jvm/org/apache/storm/windowing/TupleWindowImpl.java index 2560d25..1e8b022 100644 --- a/storm-core/src/jvm/org/apache/storm/windowing/TupleWindowImpl.java +++ b/storm-core/src/jvm/org/apache/storm/windowing/TupleWindowImpl.java @@ -28,11 +28,17 @@ public class TupleWindowImpl implements TupleWindow { private final List<Tuple> tuples; private final List<Tuple> newTuples; private final List<Tuple> expiredTuples; + private final Long timestamp; public TupleWindowImpl(List<Tuple> tuples, List<Tuple> newTuples, List<Tuple> expiredTuples) { + this(tuples, newTuples, expiredTuples, null); + } + + public TupleWindowImpl(List<Tuple> tuples, List<Tuple> newTuples, List<Tuple> expiredTuples, Long timestamp) { this.tuples = tuples; this.newTuples = newTuples; this.expiredTuples = expiredTuples; + this.timestamp = timestamp; } @Override @@ -51,6 +57,11 @@ public class TupleWindowImpl implements TupleWindow { } @Override + public Long getTimestamp() { + return timestamp != null ? timestamp : System.currentTimeMillis(); + } + + @Override public String toString() { return "TupleWindowImpl{" + "tuples=" + tuples + http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java b/storm-core/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java index 74240bb..7304366 100644 --- a/storm-core/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java +++ b/storm-core/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java @@ -24,11 +24,6 @@ package org.apache.storm.windowing; * @param <T> the type of event tracked by this policy. */ public class WatermarkCountEvictionPolicy<T> extends CountEvictionPolicy<T> { - /* - * The reference time in millis for window calculations and - * expiring events. If not set it will default to System.currentTimeMillis() - */ - private long referenceTime; private long processed = 0L; public WatermarkCountEvictionPolicy(int count) { @@ -38,7 +33,7 @@ public class WatermarkCountEvictionPolicy<T> extends CountEvictionPolicy<T> { @Override public Action evict(Event<T> event) { Action action; - if (event.getTimestamp() <= referenceTime && processed < currentCount.get()) { + if (event.getTimestamp() <= super.getContext().getReferenceTime() && processed < currentCount.get()) { action = super.evict(event); if (action == Action.PROCESS) { ++processed; @@ -56,7 +51,7 @@ public class WatermarkCountEvictionPolicy<T> extends CountEvictionPolicy<T> { @Override public void setContext(EvictionContext context) { - referenceTime = context.getReferenceTime(); + super.setContext(context); if (context.getCurrentCount() != null) { currentCount.set(context.getCurrentCount()); } else { @@ -68,7 +63,6 @@ public class WatermarkCountEvictionPolicy<T> extends CountEvictionPolicy<T> { @Override public String toString() { return "WatermarkCountEvictionPolicy{" + - "referenceTime=" + referenceTime + "} " + super.toString(); } } http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/windowing/WatermarkTimeEvictionPolicy.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/windowing/WatermarkTimeEvictionPolicy.java b/storm-core/src/jvm/org/apache/storm/windowing/WatermarkTimeEvictionPolicy.java index 53361d2..e5ecba4 100644 --- a/storm-core/src/jvm/org/apache/storm/windowing/WatermarkTimeEvictionPolicy.java +++ b/storm-core/src/jvm/org/apache/storm/windowing/WatermarkTimeEvictionPolicy.java @@ -45,7 +45,6 @@ public class WatermarkTimeEvictionPolicy<T> extends TimeEvictionPolicy<T> { */ public WatermarkTimeEvictionPolicy(int windowLength, int lag) { super(windowLength); - referenceTime = 0L; this.lag = lag; } @@ -58,7 +57,8 @@ public class WatermarkTimeEvictionPolicy<T> extends TimeEvictionPolicy<T> { */ @Override public Action evict(Event<T> event) { - long diff = referenceTime - event.getTimestamp(); + long referenceTime = evictionContext.getReferenceTime() != null ? evictionContext.getReferenceTime() : 0L; + long diff = referenceTime - event.getTimestamp(); if (diff < -lag) { return Action.STOP; } else if (diff < 0) { http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/windowing/Window.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/windowing/Window.java b/storm-core/src/jvm/org/apache/storm/windowing/Window.java index 8382448..9a62eef 100644 --- a/storm-core/src/jvm/org/apache/storm/windowing/Window.java +++ b/storm-core/src/jvm/org/apache/storm/windowing/Window.java @@ -45,4 +45,11 @@ public interface Window<T> { * @return the list of events expired from the window. */ List<T> getExpired(); + + /** + * If processing based on event time, returns the watermark time otherwise the current timestamp. + * + * @return the window timestamp + */ + Long getTimestamp(); } http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/windowing/WindowLifecycleListener.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/windowing/WindowLifecycleListener.java b/storm-core/src/jvm/org/apache/storm/windowing/WindowLifecycleListener.java index 03c0213..ea2c997 100644 --- a/storm-core/src/jvm/org/apache/storm/windowing/WindowLifecycleListener.java +++ b/storm-core/src/jvm/org/apache/storm/windowing/WindowLifecycleListener.java @@ -37,6 +37,7 @@ public interface WindowLifecycleListener<T> { * @param events the list of current events in the window. * @param newEvents the newly added events since last activation. * @param expired the expired events since last activation. + * @param referenceTime the reference (event or processing) time that resulted in activation */ - void onActivation(List<T> events, List<T> newEvents, List<T> expired); + void onActivation(List<T> events, List<T> newEvents, List<T> expired, Long referenceTime); } http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/windowing/WindowManager.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/windowing/WindowManager.java b/storm-core/src/jvm/org/apache/storm/windowing/WindowManager.java index 792509e..8021ba8 100644 --- a/storm-core/src/jvm/org/apache/storm/windowing/WindowManager.java +++ b/storm-core/src/jvm/org/apache/storm/windowing/WindowManager.java @@ -142,7 +142,7 @@ public class WindowManager<T> implements TriggerHandler { if (!events.isEmpty()) { prevWindowEvents.addAll(windowEvents); LOG.debug("invoking windowLifecycleListener onActivation, [{}] events in window.", events.size()); - windowLifecycleListener.onActivation(events, newEvents, expired); + windowLifecycleListener.onActivation(events, newEvents, expired, evictionPolicy.getContext().getReferenceTime()); } else { LOG.debug("No events in the window, skipping onActivation"); } http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/test/jvm/org/apache/storm/streams/ProcessorBoltTest.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/streams/ProcessorBoltTest.java b/storm-core/test/jvm/org/apache/storm/streams/ProcessorBoltTest.java new file mode 100644 index 0000000..e9d5127 --- /dev/null +++ b/storm-core/test/jvm/org/apache/storm/streams/ProcessorBoltTest.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.streams; + +import com.google.common.collect.Multimap; +import org.apache.storm.streams.operations.aggregators.Sum; +import org.apache.storm.streams.processors.AggregateProcessor; +import org.apache.storm.streams.processors.FilterProcessor; +import org.apache.storm.streams.processors.Processor; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.jgrapht.DirectedGraph; +import org.jgrapht.graph.DefaultDirectedGraph; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Set; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +/** + * Unit tests for {@link ProcessorBolt} + */ +public class ProcessorBoltTest { + TopologyContext mockTopologyContext; + OutputCollector mockOutputCollector; + ProcessorBolt bolt; + Tuple mockTuple1; + Tuple mockTuple2; + Tuple mockTuple3; + Tuple punctuation; + Multimap<String, ProcessorNode> mockStreamToProcessors; + DirectedGraph<Node, Edge> graph; + + @Before + public void setUp() throws Exception { + mockTopologyContext = Mockito.mock(TopologyContext.class); + mockOutputCollector = Mockito.mock(OutputCollector.class); + mockTuple1 = Mockito.mock(Tuple.class); + mockTuple2 = Mockito.mock(Tuple.class); + mockTuple3 = Mockito.mock(Tuple.class); + setUpMockTuples(mockTuple1, mockTuple2, mockTuple3); + punctuation = Mockito.mock(Tuple.class); + setUpPunctuation(punctuation); + mockStreamToProcessors = Mockito.mock(Multimap.class); + graph = new DefaultDirectedGraph(new StreamsEdgeFactory()); + + } + + @Test + public void testEmitAndAck() throws Exception { + setUpProcessorBolt(new FilterProcessor<Integer>(x -> true)); + bolt.execute(mockTuple1); + ArgumentCaptor<Collection> anchor = ArgumentCaptor.forClass(Collection.class); + ArgumentCaptor<Values> values = ArgumentCaptor.forClass(Values.class); + ArgumentCaptor<String> os = ArgumentCaptor.forClass(String.class); + Mockito.verify(mockOutputCollector).emit(os.capture(), anchor.capture(), values.capture()); + assertEquals("outputstream", os.getValue()); + assertArrayEquals(new Object[]{mockTuple1}, anchor.getValue().toArray()); + assertEquals(new Values(100), values.getValue()); + Mockito.verify(mockOutputCollector, Mockito.times(1)).ack(mockTuple1); + } + + @Test + public void testAggResultAndAck() throws Exception { + setUpProcessorBolt(new AggregateProcessor<>(new Sum()), Collections.singleton("inputstream"), true, null); + bolt.execute(mockTuple2); + bolt.execute(mockTuple3); + bolt.execute(punctuation); + ArgumentCaptor<Collection> anchor = ArgumentCaptor.forClass(Collection.class); + ArgumentCaptor<Values> values = ArgumentCaptor.forClass(Values.class); + ArgumentCaptor<String> os = ArgumentCaptor.forClass(String.class); + Mockito.verify(mockOutputCollector, Mockito.times(2)).emit(os.capture(), anchor.capture(), values.capture()); + assertArrayEquals(new Object[]{mockTuple2, mockTuple3, punctuation}, anchor.getAllValues().get(0).toArray()); + assertArrayEquals(new Object[]{mockTuple2, mockTuple3, punctuation}, anchor.getAllValues().get(1).toArray()); + assertArrayEquals(new Object[]{new Values(200L), new Values("__punctuation")}, values.getAllValues().toArray()); + assertArrayEquals(new Object[]{"outputstream", "outputstream"}, os.getAllValues().toArray()); + Mockito.verify(mockOutputCollector).ack(mockTuple2); + Mockito.verify(mockOutputCollector).ack(mockTuple3); + Mockito.verify(mockOutputCollector).ack(punctuation); + } + + @Test + public void testEmitTs() throws Exception { + Tuple tupleWithTs = Mockito.mock(Tuple.class); + setUpMockTuples(tupleWithTs); + Mockito.when(tupleWithTs.getLongByField("ts")).thenReturn(12345L); + setUpProcessorBolt(new FilterProcessor(x -> true), "ts"); + bolt.execute(tupleWithTs); + ArgumentCaptor<Collection> anchor = ArgumentCaptor.forClass(Collection.class); + ArgumentCaptor<Values> values = ArgumentCaptor.forClass(Values.class); + ArgumentCaptor<String> os = ArgumentCaptor.forClass(String.class); + Mockito.verify(mockOutputCollector).emit(os.capture(), anchor.capture(), values.capture()); + assertEquals("outputstream", os.getValue()); + assertArrayEquals(new Object[]{tupleWithTs}, anchor.getValue().toArray()); + assertEquals(new Values(100, 12345L), values.getValue()); + Mockito.verify(mockOutputCollector, Mockito.times(1)).ack(tupleWithTs); + } + + private void setUpProcessorBolt(Processor<?> processor) { + setUpProcessorBolt(processor, Collections.emptySet(), false, null); + } + + private void setUpProcessorBolt(Processor<?> processor, String tsFieldName) { + setUpProcessorBolt(processor, Collections.emptySet(), false, tsFieldName); + } + + private void setUpProcessorBolt(Processor<?> processor, + Set<String> windowedParentStreams, + boolean isWindowed, + String tsFieldName) { + ProcessorNode node = new ProcessorNode(processor, "outputstream", new Fields("value")); + node.setWindowedParentStreams(windowedParentStreams); + node.setWindowed(isWindowed); + Mockito.when(mockStreamToProcessors.get(Mockito.anyString())).thenReturn(Collections.singletonList(node)); + graph.addVertex(node); + bolt = new ProcessorBolt("bolt1", graph, Collections.singletonList(node)); + if (tsFieldName != null && !tsFieldName.isEmpty()) { + bolt.setTimestampField(tsFieldName); + } + bolt.setStreamToInitialProcessors(mockStreamToProcessors); + bolt.prepare(new HashMap<>(), mockTopologyContext, mockOutputCollector); + } + + private void setUpMockTuples(Tuple... tuples) { + for (Tuple tuple : tuples) { + Mockito.when(tuple.size()).thenReturn(1); + Mockito.when(tuple.getValue(0)).thenReturn(100); + Mockito.when(tuple.getSourceComponent()).thenReturn("bolt0"); + Mockito.when(tuple.getSourceStreamId()).thenReturn("inputstream"); + } + } + + private void setUpPunctuation(Tuple punctuation) { + Mockito.when(punctuation.size()).thenReturn(1); + Mockito.when(punctuation.getValue(0)).thenReturn(WindowNode.PUNCTUATION); + Mockito.when(punctuation.getSourceComponent()).thenReturn("bolt0"); + Mockito.when(punctuation.getSourceStreamId()).thenReturn("inputstream"); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/test/jvm/org/apache/storm/streams/StatefulProcessorBoltTest.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/streams/StatefulProcessorBoltTest.java b/storm-core/test/jvm/org/apache/storm/streams/StatefulProcessorBoltTest.java new file mode 100644 index 0000000..dbc7e27 --- /dev/null +++ b/storm-core/test/jvm/org/apache/storm/streams/StatefulProcessorBoltTest.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.streams; + +import com.google.common.collect.Multimap; +import org.apache.storm.state.KeyValueState; +import org.apache.storm.streams.operations.aggregators.Count; +import org.apache.storm.streams.processors.Processor; +import org.apache.storm.streams.processors.UpdateStateByKeyProcessor; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.jgrapht.DirectedGraph; +import org.jgrapht.graph.DefaultDirectedGraph; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +/** + * Unit tests for {@link StatefulProcessorBolt} + */ +public class StatefulProcessorBoltTest { + TopologyContext mockTopologyContext; + OutputCollector mockOutputCollector; + StatefulProcessorBolt<String, Long> bolt; + Tuple mockTuple1; + DirectedGraph<Node, Edge> graph; + Multimap<String, ProcessorNode> mockStreamToProcessors; + KeyValueState<String, Long> mockKeyValueState; + + @Before + public void setUp() throws Exception { + mockTopologyContext = Mockito.mock(TopologyContext.class); + mockOutputCollector = Mockito.mock(OutputCollector.class); + mockTuple1 = Mockito.mock(Tuple.class); + mockStreamToProcessors = Mockito.mock(Multimap.class); + mockKeyValueState = Mockito.mock(KeyValueState.class); + setUpMockTuples(mockTuple1); + } + + @Test + public void testEmitAndAck() throws Exception { + setUpStatefulProcessorBolt(new UpdateStateByKeyProcessor<>(new Count<>())); + bolt.execute(mockTuple1); + ArgumentCaptor<Collection> anchor = ArgumentCaptor.forClass(Collection.class); + ArgumentCaptor<Values> values = ArgumentCaptor.forClass(Values.class); + ArgumentCaptor<String> os = ArgumentCaptor.forClass(String.class); + Mockito.verify(mockOutputCollector).emit(os.capture(), anchor.capture(), values.capture()); + assertEquals("outputstream", os.getValue()); + assertArrayEquals(new Object[]{mockTuple1}, anchor.getValue().toArray()); + assertEquals(new Values("k", 1L), values.getValue()); + Mockito.verify(mockOutputCollector, Mockito.times(1)).ack(mockTuple1); + Mockito.verify(mockKeyValueState, Mockito.times(1)).put("k", 1L ); + } + + private void setUpStatefulProcessorBolt(Processor<?> processor) { + ProcessorNode node = new ProcessorNode(processor, "outputstream", new Fields("value")); + Mockito.when(mockStreamToProcessors.get(Mockito.anyString())).thenReturn(Collections.singletonList(node)); + graph = new DefaultDirectedGraph(new StreamsEdgeFactory()); + graph.addVertex(node); + bolt = new StatefulProcessorBolt<>("bolt1", graph, Collections.singletonList(node)); + bolt.setStreamToInitialProcessors(mockStreamToProcessors); + bolt.prepare(new HashMap<>(), mockTopologyContext, mockOutputCollector); + bolt.initState(mockKeyValueState); + } + + private void setUpMockTuples(Tuple... tuples) { + for (Tuple tuple : tuples) { + Mockito.when(tuple.size()).thenReturn(1); + Mockito.when(tuple.getValue(0)).thenReturn(Pair.of("k", "v")); + Mockito.when(tuple.getSourceComponent()).thenReturn("bolt0"); + Mockito.when(tuple.getSourceStreamId()).thenReturn("inputstream"); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/test/jvm/org/apache/storm/streams/StreamBuilderTest.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/streams/StreamBuilderTest.java b/storm-core/test/jvm/org/apache/storm/streams/StreamBuilderTest.java new file mode 100644 index 0000000..1498ae4 --- /dev/null +++ b/storm-core/test/jvm/org/apache/storm/streams/StreamBuilderTest.java @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.streams; + +import org.apache.storm.generated.Bolt; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.generated.Grouping; +import org.apache.storm.generated.NullStruct; +import org.apache.storm.generated.SpoutSpec; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.streams.operations.aggregators.Count; +import org.apache.storm.streams.operations.mappers.PairValueMapper; +import org.apache.storm.streams.operations.mappers.ValueMapper; +import org.apache.storm.streams.processors.BranchProcessor; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IRichBolt; +import org.apache.storm.topology.IRichSpout; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.utils.Utils; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Unit tests for {@link StreamBuilder} + */ +public class StreamBuilderTest { + StreamBuilder streamBuilder; + + @Before + public void setUp() throws Exception { + streamBuilder = new StreamBuilder(); + UniqueIdGen.getInstance().reset(); + } + + @Test(expected = IllegalArgumentException.class) + public void testSpoutNoDefaultStream() throws Exception { + Stream<Tuple> stream = streamBuilder.newStream(newSpout("test")); + stream.filter(x -> true); + streamBuilder.build(); + } + + @Test + public void testSpoutToBolt() throws Exception { + Stream<Tuple> stream = streamBuilder.newStream(newSpout(Utils.DEFAULT_STREAM_ID)); + stream.to(newBolt()); + StormTopology topology = streamBuilder.build(); + assertEquals(1, topology.get_spouts_size()); + assertEquals(1, topology.get_bolts_size()); + String spoutId = topology.get_spouts().keySet().iterator().next(); + Map<GlobalStreamId, Grouping> expected = new HashMap<>(); + expected.put(new GlobalStreamId(spoutId, "default"), Grouping.shuffle(new NullStruct())); + assertEquals(expected, topology.get_bolts().values().iterator().next().get_common().get_inputs()); + } + + @Test + public void testBranch() throws Exception { + Stream<Tuple> stream = streamBuilder.newStream(newSpout(Utils.DEFAULT_STREAM_ID)); + Stream<Tuple>[] streams = stream.branch(x -> true); + StormTopology topology = streamBuilder.build(); + assertEquals(1, topology.get_spouts_size()); + assertEquals(1, topology.get_bolts_size()); + Map<GlobalStreamId, Grouping> expected = new HashMap<>(); + String spoutId = topology.get_spouts().keySet().iterator().next(); + expected.put(new GlobalStreamId(spoutId, "default"), Grouping.shuffle(new NullStruct())); + assertEquals(expected, topology.get_bolts().values().iterator().next().get_common().get_inputs()); + assertEquals(1, streams.length); + assertEquals(1, streams[0].node.getOutputStreams().size()); + String parentStream = streams[0].node.getOutputStreams().iterator().next() + "-branch"; + assertEquals(1, streams[0].node.getParents(parentStream).size()); + Node processorNdoe = streams[0].node.getParents(parentStream).iterator().next(); + assertTrue(processorNdoe instanceof ProcessorNode); + assertTrue(((ProcessorNode) processorNdoe).getProcessor() instanceof BranchProcessor); + assertTrue(processorNdoe.getParents("default").iterator().next() instanceof SpoutNode); + } + + @Test + public void testJoin() throws Exception { + Stream<Integer> stream = streamBuilder.newStream(newSpout(Utils.DEFAULT_STREAM_ID), new ValueMapper<>(0)); + Stream<Integer>[] streams = stream.branch(x -> x % 2 == 0, x-> x % 3 == 0); + PairStream<Integer, Integer> s1 = streams[0].mapToPair(x -> Pair.of(x, 1)); + PairStream<Integer, Integer> s2 = streams[1].mapToPair(x -> Pair.of(x, 1)); + PairStream<Integer, Pair<Integer, Integer>> sj = s1.join(s2); + assertEquals(Collections.singleton(s1.node), sj.node.getParents(s1.stream)); + assertEquals(Collections.singleton(s2.node), sj.node.getParents(s2.stream)); + } + + @Test + public void testGroupBy() throws Exception { + PairStream<String, String> stream = streamBuilder.newStream(newSpout(Utils.DEFAULT_STREAM_ID), new PairValueMapper<>(0, 1)); + + stream.groupByKey().aggregateByKey(new Count<>()); + + StormTopology topology = streamBuilder.build(); + assertEquals(2, topology.get_bolts_size()); + Bolt bolt1 = topology.get_bolts().get("bolt1"); + Bolt bolt2 = topology.get_bolts().get("bolt2"); + assertEquals(Grouping.shuffle(new NullStruct()), bolt1.get_common().get_inputs().values().iterator().next()); + assertEquals(Grouping.fields(Collections.singletonList("key")), bolt2.get_common().get_inputs().values().iterator().next()); + } + + @Test + public void testGlobalAggregate() throws Exception { + Stream<String> stream = streamBuilder.newStream(newSpout(Utils.DEFAULT_STREAM_ID), new ValueMapper<>(0)); + + stream.aggregate(new Count<>()); + + StormTopology topology = streamBuilder.build(); + assertEquals(2, topology.get_bolts_size()); + Bolt bolt1 = topology.get_bolts().get("bolt1"); + Bolt bolt2 = topology.get_bolts().get("bolt2"); + String spoutId = topology.get_spouts().keySet().iterator().next(); + Map<GlobalStreamId, Grouping> expected1 = new HashMap<>(); + expected1.put(new GlobalStreamId(spoutId, "default"), Grouping.shuffle(new NullStruct())); + Map<GlobalStreamId, Grouping> expected2 = new HashMap<>(); + expected2.put(new GlobalStreamId("bolt1", "s1"), Grouping.fields(Collections.emptyList())); + assertEquals(expected1, bolt1.get_common().get_inputs()); + assertEquals(expected2, bolt2.get_common().get_inputs()); + } + + @Test + public void testRepartition() throws Exception { + Stream<String> stream = streamBuilder.newStream(newSpout(Utils.DEFAULT_STREAM_ID), new ValueMapper<>(0)); + stream.repartition(3).filter(x -> true).repartition(2).filter(x -> true).aggregate(new Count<>()); + StormTopology topology = streamBuilder.build(); + assertEquals(1, topology.get_spouts_size()); + SpoutSpec spout = topology.get_spouts().get("spout1"); + assertEquals(4, topology.get_bolts_size()); + Bolt bolt1 = topology.get_bolts().get("bolt1"); + Bolt bolt2 = topology.get_bolts().get("bolt2"); + Bolt bolt3 = topology.get_bolts().get("bolt3"); + Bolt bolt4 = topology.get_bolts().get("bolt4"); + assertEquals(1, spout.get_common().get_parallelism_hint()); + assertEquals(1, bolt1.get_common().get_parallelism_hint()); + assertEquals(3, bolt2.get_common().get_parallelism_hint()); + assertEquals(2, bolt3.get_common().get_parallelism_hint()); + assertEquals(2, bolt4.get_common().get_parallelism_hint()); + } + + @Test + public void testBranchAndJoin() throws Exception { + TopologyContext mockContext = Mockito.mock(TopologyContext.class); + OutputCollector mockCollector = Mockito.mock(OutputCollector.class); + Stream<Integer> stream = streamBuilder.newStream(newSpout(Utils.DEFAULT_STREAM_ID), new ValueMapper<>(0)); + Stream<Integer>[] streams = stream.branch(x -> x % 2 == 0, x -> x % 2 == 1); + PairStream<Integer, Pair<Integer, Integer>> joined = streams[0].mapToPair(x -> Pair.of(x, 1)).join(streams[1].mapToPair(x -> Pair.of(x, 1))); + assertTrue(joined.getNode() instanceof ProcessorNode); + StormTopology topology = streamBuilder.build(); + assertEquals(1, topology.get_bolts_size()); + } + + private static IRichSpout newSpout(final String os) { + return new BaseRichSpout() { + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declareStream(os, new Fields("value")); + } + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + + } + + @Override + public void nextTuple() { + + } + }; + } + + private static IRichBolt newBolt() { + return new BaseRichBolt() { + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + + } + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + + } + + @Override + public void execute(Tuple input) { + + } + }; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/test/jvm/org/apache/storm/streams/WindowedProcessorBoltTest.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/streams/WindowedProcessorBoltTest.java b/storm-core/test/jvm/org/apache/storm/streams/WindowedProcessorBoltTest.java new file mode 100644 index 0000000..7428e3f --- /dev/null +++ b/storm-core/test/jvm/org/apache/storm/streams/WindowedProcessorBoltTest.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.streams; + +import com.google.common.collect.Multimap; +import org.apache.storm.streams.operations.aggregators.Count; +import org.apache.storm.streams.processors.AggregateProcessor; +import org.apache.storm.streams.processors.Processor; +import org.apache.storm.streams.windowing.TumblingWindows; +import org.apache.storm.streams.windowing.Window; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.base.BaseWindowedBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.windowing.TupleWindow; +import org.jgrapht.DirectedGraph; +import org.jgrapht.graph.DefaultDirectedGraph; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; + +import static org.junit.Assert.assertEquals; + +/** + * Unit test for {@link WindowedProcessorBolt} + */ +public class WindowedProcessorBoltTest { + TopologyContext mockTopologyContext; + OutputCollector mockOutputCollector; + WindowedProcessorBolt bolt; + Tuple mockTuple1; + Tuple mockTuple2; + Tuple mockTuple3; + DirectedGraph<Node, Edge> graph; + Multimap<String, ProcessorNode> mockStreamToProcessors; + + @Before + public void setUp() throws Exception { + mockTopologyContext = Mockito.mock(TopologyContext.class); + mockOutputCollector = Mockito.mock(OutputCollector.class); + mockTuple1 = Mockito.mock(Tuple.class); + mockTuple2 = Mockito.mock(Tuple.class); + mockTuple3 = Mockito.mock(Tuple.class); + setUpMockTuples(mockTuple1, mockTuple2, mockTuple3); + mockStreamToProcessors = Mockito.mock(Multimap.class); + } + + @Test + public void testEmit() throws Exception { + Window<?, ?> window = TumblingWindows.of(BaseWindowedBolt.Count.of(2)); + setUpWindowedProcessorBolt(new AggregateProcessor<>(new Count<>()), window); + bolt.execute(getMockTupleWindow(mockTuple1, mockTuple2, mockTuple3)); + ArgumentCaptor<Values> values = ArgumentCaptor.forClass(Values.class); + ArgumentCaptor<String> os = ArgumentCaptor.forClass(String.class); + Mockito.verify(mockOutputCollector, Mockito.times(2)).emit(os.capture(), values.capture()); + assertEquals("outputstream", os.getAllValues().get(0)); + assertEquals(new Values(3L), values.getAllValues().get(0)); + assertEquals("outputstream", os.getAllValues().get(1)); + assertEquals(new Values(WindowNode.PUNCTUATION), values.getAllValues().get(1)); + } + + private void setUpWindowedProcessorBolt(Processor<?> processor, Window<?, ?> window) { + ProcessorNode node = new ProcessorNode(processor, "outputstream", new Fields("value")); + node.setWindowed(true); + Mockito.when(mockStreamToProcessors.get(Mockito.anyString())).thenReturn(Collections.singletonList(node)); + Mockito.when(mockStreamToProcessors.keySet()).thenReturn(Collections.singleton("inputstream")); + graph = new DefaultDirectedGraph<>(new StreamsEdgeFactory()); + graph.addVertex(node); + bolt = new WindowedProcessorBolt("bolt1", graph, Collections.singletonList(node), window); + bolt.setStreamToInitialProcessors(mockStreamToProcessors); + bolt.prepare(new HashMap<>(), mockTopologyContext, mockOutputCollector); + } + + private void setUpMockTuples(Tuple... tuples) { + for (Tuple tuple : tuples) { + Mockito.when(tuple.size()).thenReturn(1); + Mockito.when(tuple.getValue(0)).thenReturn(100); + Mockito.when(tuple.getSourceComponent()).thenReturn("bolt0"); + Mockito.when(tuple.getSourceStreamId()).thenReturn("inputstream"); + } + } + + private TupleWindow getMockTupleWindow(Tuple... tuples) { + TupleWindow tupleWindow = Mockito.mock(TupleWindow.class); + Mockito.when(tupleWindow.get()).thenReturn(Arrays.asList(tuples)); + return tupleWindow; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/test/jvm/org/apache/storm/windowing/WindowManagerTest.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/windowing/WindowManagerTest.java b/storm-core/test/jvm/org/apache/storm/windowing/WindowManagerTest.java index 6645566..6c170c6 100644 --- a/storm-core/test/jvm/org/apache/storm/windowing/WindowManagerTest.java +++ b/storm-core/test/jvm/org/apache/storm/windowing/WindowManagerTest.java @@ -60,7 +60,7 @@ public class WindowManagerTest { } @Override - public void onActivation(List<Integer> events, List<Integer> newEvents, List<Integer> expired) { + public void onActivation(List<Integer> events, List<Integer> newEvents, List<Integer> expired, Long timestamp) { onActivationEvents = events; allOnActivationEvents.add(events); onActivationNewEvents = newEvents;