Repository: storm
Updated Branches:
  refs/heads/master a7cdfefd7 -> b5f02d4e4


http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/topology/StatefulWindowedBoltExecutor.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/topology/StatefulWindowedBoltExecutor.java
 
b/storm-core/src/jvm/org/apache/storm/topology/StatefulWindowedBoltExecutor.java
index aff46e6..2fc6f8d 100644
--- 
a/storm-core/src/jvm/org/apache/storm/topology/StatefulWindowedBoltExecutor.java
+++ 
b/storm-core/src/jvm/org/apache/storm/topology/StatefulWindowedBoltExecutor.java
@@ -207,14 +207,14 @@ public class StatefulWindowedBoltExecutor<T extends 
State> extends WindowedBoltE
             }
 
             @Override
-            public void onActivation(List<Tuple> events, List<Tuple> 
newEvents, List<Tuple> expired) {
+            public void onActivation(List<Tuple> events, List<Tuple> 
newEvents, List<Tuple> expired, Long timestamp) {
                 if (isRecovering()) {
                     String msg = String.format("Unexpected activation with 
events %s, newEvents %s, expired %s in recovering state. " +
                                                        "recoveryStates %s ", 
events, newEvents, expired, recoveryStates);
                     LOG.error(msg);
                     throw new IllegalStateException(msg);
                 } else {
-                    parentListener.onActivation(events, newEvents, expired);
+                    parentListener.onActivation(events, newEvents, expired, 
timestamp);
                     updateWindowState(expired, newEvents);
                 }
             }

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java 
b/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
index fd98274..b592e0b 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
@@ -327,9 +327,9 @@ public class WindowedBoltExecutor implements IRichBolt {
             }
 
             @Override
-            public void onActivation(List<Tuple> tuples, List<Tuple> 
newTuples, List<Tuple> expiredTuples) {
+            public void onActivation(List<Tuple> tuples, List<Tuple> 
newTuples, List<Tuple> expiredTuples, Long timestamp) {
                 windowedOutputCollector.setContext(tuples);
-                bolt.execute(new TupleWindowImpl(tuples, newTuples, 
expiredTuples));
+                bolt.execute(new TupleWindowImpl(tuples, newTuples, 
expiredTuples, timestamp));
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java 
b/storm-core/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java
index 2e2af0d..ba97c26 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java
@@ -27,6 +27,7 @@ import org.apache.storm.windowing.TimestampExtractor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -40,7 +41,7 @@ public abstract class BaseWindowedBolt implements 
IWindowedBolt {
     /**
      * Holds a count value for count based windows and sliding intervals.
      */
-    public static class Count {
+    public static class Count implements Serializable {
         public final int value;
 
         public Count(int value) {
@@ -58,6 +59,22 @@ public abstract class BaseWindowedBolt implements 
IWindowedBolt {
         }
 
         @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            Count count = (Count) o;
+
+            return value == count.value;
+
+        }
+
+        @Override
+        public int hashCode() {
+            return value;
+        }
+
+        @Override
         public String toString() {
             return "Count{" +
                     "value=" + value +
@@ -68,7 +85,7 @@ public abstract class BaseWindowedBolt implements 
IWindowedBolt {
     /**
      * Holds a Time duration for time based windows and sliding intervals.
      */
-    public static class Duration {
+    public static class Duration implements Serializable {
         public final int value;
 
         public Duration(int value, TimeUnit timeUnit) {
@@ -126,6 +143,22 @@ public abstract class BaseWindowedBolt implements 
IWindowedBolt {
         }
 
         @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            Duration duration = (Duration) o;
+
+            return value == duration.value;
+
+        }
+
+        @Override
+        public int hashCode() {
+            return value;
+        }
+
+        @Override
         public String toString() {
             return "Duration{" +
                     "value=" + value +

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
index f93527a..a8fbb41 100644
--- 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
@@ -121,7 +121,7 @@ public abstract class AbstractTridentWindowManager<T> 
implements ITridentWindowM
         }
 
         @Override
-        public void onActivation(List<T> events, List<T> newEvents, List<T> 
expired) {
+        public void onActivation(List<T> events, List<T> newEvents, List<T> 
expired, Long timestamp) {
             LOG.debug("onActivation is invoked with events size: [{}]", 
events.size());
             // trigger occurred, create an aggregation and keep them in store
             int currentTriggerId = triggerId.incrementAndGet();

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/windowing/CountEvictionPolicy.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/windowing/CountEvictionPolicy.java 
b/storm-core/src/jvm/org/apache/storm/windowing/CountEvictionPolicy.java
index fb12202..6a9a4f8 100644
--- a/storm-core/src/jvm/org/apache/storm/windowing/CountEvictionPolicy.java
+++ b/storm-core/src/jvm/org/apache/storm/windowing/CountEvictionPolicy.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicLong;
 public class CountEvictionPolicy<T> implements EvictionPolicy<T> {
     protected final int threshold;
     protected final AtomicLong currentCount;
+    private EvictionContext context;
 
     public CountEvictionPolicy(int count) {
         this.threshold = count;
@@ -62,7 +63,12 @@ public class CountEvictionPolicy<T> implements 
EvictionPolicy<T> {
 
     @Override
     public void setContext(EvictionContext context) {
-        // NOOP
+        this.context = context;
+    }
+
+    @Override
+    public EvictionContext getContext() {
+        return context;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/windowing/EvictionPolicy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/windowing/EvictionPolicy.java 
b/storm-core/src/jvm/org/apache/storm/windowing/EvictionPolicy.java
index 05e4d93..774d0a3 100644
--- a/storm-core/src/jvm/org/apache/storm/windowing/EvictionPolicy.java
+++ b/storm-core/src/jvm/org/apache/storm/windowing/EvictionPolicy.java
@@ -72,4 +72,11 @@ public interface EvictionPolicy<T> {
      */
     void setContext(EvictionContext context);
 
+    /**
+     * Returns the current context that is part of this eviction policy
+     *
+     * @return the eviction context
+     */
+    EvictionContext getContext();
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java 
b/storm-core/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java
index e646207..802e6bb 100644
--- a/storm-core/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java
+++ b/storm-core/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java
@@ -22,11 +22,7 @@ package org.apache.storm.windowing;
  */
 public class TimeEvictionPolicy<T> implements EvictionPolicy<T> {
     private final int windowLength;
-    /**
-     * The reference time in millis for window calculations and
-     * expiring events. If not set it will default to 
System.currentTimeMillis()
-     */
-    protected Long referenceTime;
+    protected EvictionContext evictionContext;
 
     /**
      * Constructs a TimeEvictionPolicy that evicts events older
@@ -43,7 +39,7 @@ public class TimeEvictionPolicy<T> implements 
EvictionPolicy<T> {
      */
     @Override
     public Action evict(Event<T> event) {
-        long now = referenceTime == null ? System.currentTimeMillis() : 
referenceTime;
+        long now = evictionContext == null ? System.currentTimeMillis() : 
evictionContext.getReferenceTime();
         long diff = now - event.getTimestamp();
         if (diff >= windowLength) {
             return Action.EXPIRE;
@@ -58,14 +54,19 @@ public class TimeEvictionPolicy<T> implements 
EvictionPolicy<T> {
 
     @Override
     public void setContext(EvictionContext context) {
-        referenceTime = context.getReferenceTime();
+        this.evictionContext = context;
+    }
+
+    @Override
+    public EvictionContext getContext() {
+        return evictionContext;
     }
 
     @Override
     public String toString() {
         return "TimeEvictionPolicy{" +
                 "windowLength=" + windowLength +
-                ", referenceTime=" + referenceTime +
+                ", evictionContext=" + evictionContext +
                 '}';
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/windowing/TupleWindowImpl.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/windowing/TupleWindowImpl.java 
b/storm-core/src/jvm/org/apache/storm/windowing/TupleWindowImpl.java
index 2560d25..1e8b022 100644
--- a/storm-core/src/jvm/org/apache/storm/windowing/TupleWindowImpl.java
+++ b/storm-core/src/jvm/org/apache/storm/windowing/TupleWindowImpl.java
@@ -28,11 +28,17 @@ public class TupleWindowImpl implements TupleWindow {
     private final List<Tuple> tuples;
     private final List<Tuple> newTuples;
     private final List<Tuple> expiredTuples;
+    private final Long timestamp;
 
     public TupleWindowImpl(List<Tuple> tuples, List<Tuple> newTuples, 
List<Tuple> expiredTuples) {
+        this(tuples, newTuples, expiredTuples, null);
+    }
+
+    public TupleWindowImpl(List<Tuple> tuples, List<Tuple> newTuples, 
List<Tuple> expiredTuples, Long timestamp) {
         this.tuples = tuples;
         this.newTuples = newTuples;
         this.expiredTuples = expiredTuples;
+        this.timestamp = timestamp;
     }
 
     @Override
@@ -51,6 +57,11 @@ public class TupleWindowImpl implements TupleWindow {
     }
 
     @Override
+    public Long getTimestamp() {
+        return timestamp != null ? timestamp : System.currentTimeMillis();
+    }
+
+    @Override
     public String toString() {
         return "TupleWindowImpl{" +
                 "tuples=" + tuples +

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java
 
b/storm-core/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java
index 74240bb..7304366 100644
--- 
a/storm-core/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java
+++ 
b/storm-core/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java
@@ -24,11 +24,6 @@ package org.apache.storm.windowing;
  * @param <T> the type of event tracked by this policy.
  */
 public class WatermarkCountEvictionPolicy<T> extends CountEvictionPolicy<T> {
-    /*
-     * The reference time in millis for window calculations and
-     * expiring events. If not set it will default to 
System.currentTimeMillis()
-     */
-    private long referenceTime;
     private long processed = 0L;
 
     public WatermarkCountEvictionPolicy(int count) {
@@ -38,7 +33,7 @@ public class WatermarkCountEvictionPolicy<T> extends 
CountEvictionPolicy<T> {
     @Override
     public Action evict(Event<T> event) {
         Action action;
-        if (event.getTimestamp() <= referenceTime && processed < 
currentCount.get()) {
+        if (event.getTimestamp() <= super.getContext().getReferenceTime() && 
processed < currentCount.get()) {
             action = super.evict(event);
             if (action == Action.PROCESS) {
                 ++processed;
@@ -56,7 +51,7 @@ public class WatermarkCountEvictionPolicy<T> extends 
CountEvictionPolicy<T> {
 
     @Override
     public void setContext(EvictionContext context) {
-        referenceTime = context.getReferenceTime();
+        super.setContext(context);
         if (context.getCurrentCount() != null) {
             currentCount.set(context.getCurrentCount());
         } else {
@@ -68,7 +63,6 @@ public class WatermarkCountEvictionPolicy<T> extends 
CountEvictionPolicy<T> {
     @Override
     public String toString() {
         return "WatermarkCountEvictionPolicy{" +
-                "referenceTime=" + referenceTime +
                 "} " + super.toString();
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/windowing/WatermarkTimeEvictionPolicy.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/windowing/WatermarkTimeEvictionPolicy.java
 
b/storm-core/src/jvm/org/apache/storm/windowing/WatermarkTimeEvictionPolicy.java
index 53361d2..e5ecba4 100644
--- 
a/storm-core/src/jvm/org/apache/storm/windowing/WatermarkTimeEvictionPolicy.java
+++ 
b/storm-core/src/jvm/org/apache/storm/windowing/WatermarkTimeEvictionPolicy.java
@@ -45,7 +45,6 @@ public class WatermarkTimeEvictionPolicy<T> extends 
TimeEvictionPolicy<T> {
      */
     public WatermarkTimeEvictionPolicy(int windowLength, int lag) {
         super(windowLength);
-        referenceTime = 0L;
         this.lag = lag;
     }
 
@@ -58,7 +57,8 @@ public class WatermarkTimeEvictionPolicy<T> extends 
TimeEvictionPolicy<T> {
      */
     @Override
     public Action evict(Event<T> event) {
-        long diff = referenceTime - event.getTimestamp();
+        long referenceTime = evictionContext.getReferenceTime() != null ? 
evictionContext.getReferenceTime() : 0L;
+        long diff =  referenceTime - event.getTimestamp();
         if (diff < -lag) {
             return Action.STOP;
         } else if (diff < 0) {

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/windowing/Window.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/windowing/Window.java 
b/storm-core/src/jvm/org/apache/storm/windowing/Window.java
index 8382448..9a62eef 100644
--- a/storm-core/src/jvm/org/apache/storm/windowing/Window.java
+++ b/storm-core/src/jvm/org/apache/storm/windowing/Window.java
@@ -45,4 +45,11 @@ public interface Window<T> {
      * @return the list of events expired from the window.
      */
     List<T> getExpired();
+
+    /**
+     * If processing based on event time, returns the watermark time otherwise 
the current timestamp.
+     *
+     * @return the window timestamp
+     */
+    Long getTimestamp();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/windowing/WindowLifecycleListener.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/windowing/WindowLifecycleListener.java 
b/storm-core/src/jvm/org/apache/storm/windowing/WindowLifecycleListener.java
index 03c0213..ea2c997 100644
--- a/storm-core/src/jvm/org/apache/storm/windowing/WindowLifecycleListener.java
+++ b/storm-core/src/jvm/org/apache/storm/windowing/WindowLifecycleListener.java
@@ -37,6 +37,7 @@ public interface WindowLifecycleListener<T> {
      * @param events the list of current events in the window.
      * @param newEvents the newly added events since last activation.
      * @param expired the expired events since last activation.
+     * @param referenceTime the reference (event or processing) time that 
resulted in activation
      */
-    void onActivation(List<T> events, List<T> newEvents, List<T> expired);
+    void onActivation(List<T> events, List<T> newEvents, List<T> expired, Long 
referenceTime);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/windowing/WindowManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/windowing/WindowManager.java 
b/storm-core/src/jvm/org/apache/storm/windowing/WindowManager.java
index 792509e..8021ba8 100644
--- a/storm-core/src/jvm/org/apache/storm/windowing/WindowManager.java
+++ b/storm-core/src/jvm/org/apache/storm/windowing/WindowManager.java
@@ -142,7 +142,7 @@ public class WindowManager<T> implements TriggerHandler {
         if (!events.isEmpty()) {
             prevWindowEvents.addAll(windowEvents);
             LOG.debug("invoking windowLifecycleListener onActivation, [{}] 
events in window.", events.size());
-            windowLifecycleListener.onActivation(events, newEvents, expired);
+            windowLifecycleListener.onActivation(events, newEvents, expired, 
evictionPolicy.getContext().getReferenceTime());
         } else {
             LOG.debug("No events in the window, skipping onActivation");
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/test/jvm/org/apache/storm/streams/ProcessorBoltTest.java
----------------------------------------------------------------------
diff --git 
a/storm-core/test/jvm/org/apache/storm/streams/ProcessorBoltTest.java 
b/storm-core/test/jvm/org/apache/storm/streams/ProcessorBoltTest.java
new file mode 100644
index 0000000..e9d5127
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/streams/ProcessorBoltTest.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams;
+
+import com.google.common.collect.Multimap;
+import org.apache.storm.streams.operations.aggregators.Sum;
+import org.apache.storm.streams.processors.AggregateProcessor;
+import org.apache.storm.streams.processors.FilterProcessor;
+import org.apache.storm.streams.processors.Processor;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.jgrapht.DirectedGraph;
+import org.jgrapht.graph.DefaultDirectedGraph;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Set;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit tests for {@link ProcessorBolt}
+ */
+public class ProcessorBoltTest {
+    TopologyContext mockTopologyContext;
+    OutputCollector mockOutputCollector;
+    ProcessorBolt bolt;
+    Tuple mockTuple1;
+    Tuple mockTuple2;
+    Tuple mockTuple3;
+    Tuple punctuation;
+    Multimap<String, ProcessorNode> mockStreamToProcessors;
+    DirectedGraph<Node, Edge> graph;
+
+    @Before
+    public void setUp() throws Exception {
+        mockTopologyContext = Mockito.mock(TopologyContext.class);
+        mockOutputCollector = Mockito.mock(OutputCollector.class);
+        mockTuple1 = Mockito.mock(Tuple.class);
+        mockTuple2 = Mockito.mock(Tuple.class);
+        mockTuple3 = Mockito.mock(Tuple.class);
+        setUpMockTuples(mockTuple1, mockTuple2, mockTuple3);
+        punctuation = Mockito.mock(Tuple.class);
+        setUpPunctuation(punctuation);
+        mockStreamToProcessors = Mockito.mock(Multimap.class);
+        graph = new DefaultDirectedGraph(new StreamsEdgeFactory());
+
+    }
+
+    @Test
+    public void testEmitAndAck() throws Exception {
+        setUpProcessorBolt(new FilterProcessor<Integer>(x -> true));
+        bolt.execute(mockTuple1);
+        ArgumentCaptor<Collection> anchor = 
ArgumentCaptor.forClass(Collection.class);
+        ArgumentCaptor<Values> values = ArgumentCaptor.forClass(Values.class);
+        ArgumentCaptor<String> os = ArgumentCaptor.forClass(String.class);
+        Mockito.verify(mockOutputCollector).emit(os.capture(), 
anchor.capture(), values.capture());
+        assertEquals("outputstream", os.getValue());
+        assertArrayEquals(new Object[]{mockTuple1}, 
anchor.getValue().toArray());
+        assertEquals(new Values(100), values.getValue());
+        Mockito.verify(mockOutputCollector, Mockito.times(1)).ack(mockTuple1);
+    }
+
+    @Test
+    public void testAggResultAndAck() throws Exception {
+        setUpProcessorBolt(new AggregateProcessor<>(new Sum()), 
Collections.singleton("inputstream"), true, null);
+        bolt.execute(mockTuple2);
+        bolt.execute(mockTuple3);
+        bolt.execute(punctuation);
+        ArgumentCaptor<Collection> anchor = 
ArgumentCaptor.forClass(Collection.class);
+        ArgumentCaptor<Values> values = ArgumentCaptor.forClass(Values.class);
+        ArgumentCaptor<String> os = ArgumentCaptor.forClass(String.class);
+        Mockito.verify(mockOutputCollector, 
Mockito.times(2)).emit(os.capture(), anchor.capture(), values.capture());
+        assertArrayEquals(new Object[]{mockTuple2, mockTuple3, punctuation}, 
anchor.getAllValues().get(0).toArray());
+        assertArrayEquals(new Object[]{mockTuple2, mockTuple3, punctuation}, 
anchor.getAllValues().get(1).toArray());
+        assertArrayEquals(new Object[]{new Values(200L), new 
Values("__punctuation")}, values.getAllValues().toArray());
+        assertArrayEquals(new Object[]{"outputstream", "outputstream"}, 
os.getAllValues().toArray());
+        Mockito.verify(mockOutputCollector).ack(mockTuple2);
+        Mockito.verify(mockOutputCollector).ack(mockTuple3);
+        Mockito.verify(mockOutputCollector).ack(punctuation);
+    }
+
+    @Test
+    public void testEmitTs() throws Exception {
+        Tuple tupleWithTs = Mockito.mock(Tuple.class);
+        setUpMockTuples(tupleWithTs);
+        Mockito.when(tupleWithTs.getLongByField("ts")).thenReturn(12345L);
+        setUpProcessorBolt(new FilterProcessor(x -> true), "ts");
+        bolt.execute(tupleWithTs);
+        ArgumentCaptor<Collection> anchor = 
ArgumentCaptor.forClass(Collection.class);
+        ArgumentCaptor<Values> values = ArgumentCaptor.forClass(Values.class);
+        ArgumentCaptor<String> os = ArgumentCaptor.forClass(String.class);
+        Mockito.verify(mockOutputCollector).emit(os.capture(), 
anchor.capture(), values.capture());
+        assertEquals("outputstream", os.getValue());
+        assertArrayEquals(new Object[]{tupleWithTs}, 
anchor.getValue().toArray());
+        assertEquals(new Values(100, 12345L), values.getValue());
+        Mockito.verify(mockOutputCollector, Mockito.times(1)).ack(tupleWithTs);
+    }
+
+    private void setUpProcessorBolt(Processor<?> processor) {
+        setUpProcessorBolt(processor, Collections.emptySet(), false, null);
+    }
+
+    private void setUpProcessorBolt(Processor<?> processor, String 
tsFieldName) {
+        setUpProcessorBolt(processor, Collections.emptySet(), false, 
tsFieldName);
+    }
+
+    private void setUpProcessorBolt(Processor<?> processor,
+                                    Set<String> windowedParentStreams,
+                                    boolean isWindowed,
+                                    String tsFieldName) {
+        ProcessorNode node = new ProcessorNode(processor, "outputstream", new 
Fields("value"));
+        node.setWindowedParentStreams(windowedParentStreams);
+        node.setWindowed(isWindowed);
+        
Mockito.when(mockStreamToProcessors.get(Mockito.anyString())).thenReturn(Collections.singletonList(node));
+        graph.addVertex(node);
+        bolt = new ProcessorBolt("bolt1", graph, 
Collections.singletonList(node));
+        if (tsFieldName != null && !tsFieldName.isEmpty()) {
+            bolt.setTimestampField(tsFieldName);
+        }
+        bolt.setStreamToInitialProcessors(mockStreamToProcessors);
+        bolt.prepare(new HashMap<>(), mockTopologyContext, 
mockOutputCollector);
+    }
+
+    private void setUpMockTuples(Tuple... tuples) {
+        for (Tuple tuple : tuples) {
+            Mockito.when(tuple.size()).thenReturn(1);
+            Mockito.when(tuple.getValue(0)).thenReturn(100);
+            Mockito.when(tuple.getSourceComponent()).thenReturn("bolt0");
+            Mockito.when(tuple.getSourceStreamId()).thenReturn("inputstream");
+        }
+    }
+
+    private void setUpPunctuation(Tuple punctuation) {
+        Mockito.when(punctuation.size()).thenReturn(1);
+        
Mockito.when(punctuation.getValue(0)).thenReturn(WindowNode.PUNCTUATION);
+        Mockito.when(punctuation.getSourceComponent()).thenReturn("bolt0");
+        
Mockito.when(punctuation.getSourceStreamId()).thenReturn("inputstream");
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/test/jvm/org/apache/storm/streams/StatefulProcessorBoltTest.java
----------------------------------------------------------------------
diff --git 
a/storm-core/test/jvm/org/apache/storm/streams/StatefulProcessorBoltTest.java 
b/storm-core/test/jvm/org/apache/storm/streams/StatefulProcessorBoltTest.java
new file mode 100644
index 0000000..dbc7e27
--- /dev/null
+++ 
b/storm-core/test/jvm/org/apache/storm/streams/StatefulProcessorBoltTest.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams;
+
+import com.google.common.collect.Multimap;
+import org.apache.storm.state.KeyValueState;
+import org.apache.storm.streams.operations.aggregators.Count;
+import org.apache.storm.streams.processors.Processor;
+import org.apache.storm.streams.processors.UpdateStateByKeyProcessor;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.jgrapht.DirectedGraph;
+import org.jgrapht.graph.DefaultDirectedGraph;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit tests for {@link StatefulProcessorBolt}
+ */
+public class StatefulProcessorBoltTest {
+    TopologyContext mockTopologyContext;
+    OutputCollector mockOutputCollector;
+    StatefulProcessorBolt<String, Long> bolt;
+    Tuple mockTuple1;
+    DirectedGraph<Node, Edge> graph;
+    Multimap<String, ProcessorNode> mockStreamToProcessors;
+    KeyValueState<String, Long> mockKeyValueState;
+
+    @Before
+    public void setUp() throws Exception {
+        mockTopologyContext = Mockito.mock(TopologyContext.class);
+        mockOutputCollector = Mockito.mock(OutputCollector.class);
+        mockTuple1 = Mockito.mock(Tuple.class);
+        mockStreamToProcessors = Mockito.mock(Multimap.class);
+        mockKeyValueState = Mockito.mock(KeyValueState.class);
+        setUpMockTuples(mockTuple1);
+    }
+
+    @Test
+    public void testEmitAndAck() throws Exception {
+        setUpStatefulProcessorBolt(new UpdateStateByKeyProcessor<>(new 
Count<>()));
+        bolt.execute(mockTuple1);
+        ArgumentCaptor<Collection> anchor = 
ArgumentCaptor.forClass(Collection.class);
+        ArgumentCaptor<Values> values = ArgumentCaptor.forClass(Values.class);
+        ArgumentCaptor<String> os = ArgumentCaptor.forClass(String.class);
+        Mockito.verify(mockOutputCollector).emit(os.capture(), 
anchor.capture(), values.capture());
+        assertEquals("outputstream", os.getValue());
+        assertArrayEquals(new Object[]{mockTuple1}, 
anchor.getValue().toArray());
+        assertEquals(new Values("k", 1L), values.getValue());
+        Mockito.verify(mockOutputCollector, Mockito.times(1)).ack(mockTuple1);
+        Mockito.verify(mockKeyValueState, Mockito.times(1)).put("k", 1L );
+    }
+
+    private void setUpStatefulProcessorBolt(Processor<?> processor) {
+        ProcessorNode node = new ProcessorNode(processor, "outputstream", new 
Fields("value"));
+        
Mockito.when(mockStreamToProcessors.get(Mockito.anyString())).thenReturn(Collections.singletonList(node));
+        graph = new DefaultDirectedGraph(new StreamsEdgeFactory());
+        graph.addVertex(node);
+        bolt = new StatefulProcessorBolt<>("bolt1", graph, 
Collections.singletonList(node));
+        bolt.setStreamToInitialProcessors(mockStreamToProcessors);
+        bolt.prepare(new HashMap<>(), mockTopologyContext, 
mockOutputCollector);
+        bolt.initState(mockKeyValueState);
+    }
+
+    private void setUpMockTuples(Tuple... tuples) {
+        for (Tuple tuple : tuples) {
+            Mockito.when(tuple.size()).thenReturn(1);
+            Mockito.when(tuple.getValue(0)).thenReturn(Pair.of("k", "v"));
+            Mockito.when(tuple.getSourceComponent()).thenReturn("bolt0");
+            Mockito.when(tuple.getSourceStreamId()).thenReturn("inputstream");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/test/jvm/org/apache/storm/streams/StreamBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/storm-core/test/jvm/org/apache/storm/streams/StreamBuilderTest.java 
b/storm-core/test/jvm/org/apache/storm/streams/StreamBuilderTest.java
new file mode 100644
index 0000000..1498ae4
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/streams/StreamBuilderTest.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams;
+
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.NullStruct;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.streams.operations.aggregators.Count;
+import org.apache.storm.streams.operations.mappers.PairValueMapper;
+import org.apache.storm.streams.operations.mappers.ValueMapper;
+import org.apache.storm.streams.processors.BranchProcessor;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.Utils;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit tests for {@link StreamBuilder}
+ */
+public class StreamBuilderTest {
+    StreamBuilder streamBuilder;
+
+    @Before
+    public void setUp() throws Exception {
+        streamBuilder = new StreamBuilder();
+        UniqueIdGen.getInstance().reset();
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testSpoutNoDefaultStream() throws Exception {
+        Stream<Tuple> stream = streamBuilder.newStream(newSpout("test"));
+        stream.filter(x -> true);
+        streamBuilder.build();
+    }
+
+    @Test
+    public void testSpoutToBolt() throws Exception {
+        Stream<Tuple> stream = 
streamBuilder.newStream(newSpout(Utils.DEFAULT_STREAM_ID));
+        stream.to(newBolt());
+        StormTopology topology = streamBuilder.build();
+        assertEquals(1, topology.get_spouts_size());
+        assertEquals(1, topology.get_bolts_size());
+        String spoutId = topology.get_spouts().keySet().iterator().next();
+        Map<GlobalStreamId, Grouping> expected = new HashMap<>();
+        expected.put(new GlobalStreamId(spoutId, "default"), 
Grouping.shuffle(new NullStruct()));
+        assertEquals(expected, 
topology.get_bolts().values().iterator().next().get_common().get_inputs());
+    }
+
+    @Test
+    public void testBranch() throws Exception {
+        Stream<Tuple> stream = 
streamBuilder.newStream(newSpout(Utils.DEFAULT_STREAM_ID));
+        Stream<Tuple>[] streams = stream.branch(x -> true);
+        StormTopology topology = streamBuilder.build();
+        assertEquals(1, topology.get_spouts_size());
+        assertEquals(1, topology.get_bolts_size());
+        Map<GlobalStreamId, Grouping> expected = new HashMap<>();
+        String spoutId = topology.get_spouts().keySet().iterator().next();
+        expected.put(new GlobalStreamId(spoutId, "default"), 
Grouping.shuffle(new NullStruct()));
+        assertEquals(expected, 
topology.get_bolts().values().iterator().next().get_common().get_inputs());
+        assertEquals(1, streams.length);
+        assertEquals(1, streams[0].node.getOutputStreams().size());
+        String parentStream = 
streams[0].node.getOutputStreams().iterator().next() + "-branch";
+        assertEquals(1, streams[0].node.getParents(parentStream).size());
+        Node processorNdoe = 
streams[0].node.getParents(parentStream).iterator().next();
+        assertTrue(processorNdoe instanceof ProcessorNode);
+        assertTrue(((ProcessorNode) processorNdoe).getProcessor() instanceof 
BranchProcessor);
+        assertTrue(processorNdoe.getParents("default").iterator().next() 
instanceof SpoutNode);
+    }
+
+    @Test
+    public void testJoin() throws Exception {
+        Stream<Integer> stream = 
streamBuilder.newStream(newSpout(Utils.DEFAULT_STREAM_ID), new 
ValueMapper<>(0));
+        Stream<Integer>[] streams = stream.branch(x -> x % 2 == 0, x-> x % 3 
== 0);
+        PairStream<Integer, Integer> s1 = streams[0].mapToPair(x -> Pair.of(x, 
1));
+        PairStream<Integer, Integer> s2 = streams[1].mapToPair(x -> Pair.of(x, 
1));
+        PairStream<Integer, Pair<Integer, Integer>> sj = s1.join(s2);
+        assertEquals(Collections.singleton(s1.node), 
sj.node.getParents(s1.stream));
+        assertEquals(Collections.singleton(s2.node), 
sj.node.getParents(s2.stream));
+    }
+
+    @Test
+    public void testGroupBy() throws Exception {
+        PairStream<String, String> stream = 
streamBuilder.newStream(newSpout(Utils.DEFAULT_STREAM_ID), new 
PairValueMapper<>(0, 1));
+
+        stream.groupByKey().aggregateByKey(new Count<>());
+
+        StormTopology topology = streamBuilder.build();
+        assertEquals(2, topology.get_bolts_size());
+        Bolt bolt1 = topology.get_bolts().get("bolt1");
+        Bolt bolt2 = topology.get_bolts().get("bolt2");
+        assertEquals(Grouping.shuffle(new NullStruct()), 
bolt1.get_common().get_inputs().values().iterator().next());
+        assertEquals(Grouping.fields(Collections.singletonList("key")), 
bolt2.get_common().get_inputs().values().iterator().next());
+    }
+
+    @Test
+    public void testGlobalAggregate() throws Exception {
+        Stream<String> stream = 
streamBuilder.newStream(newSpout(Utils.DEFAULT_STREAM_ID), new 
ValueMapper<>(0));
+
+        stream.aggregate(new Count<>());
+
+        StormTopology topology = streamBuilder.build();
+        assertEquals(2, topology.get_bolts_size());
+        Bolt bolt1 = topology.get_bolts().get("bolt1");
+        Bolt bolt2 = topology.get_bolts().get("bolt2");
+        String spoutId = topology.get_spouts().keySet().iterator().next();
+        Map<GlobalStreamId, Grouping> expected1 = new HashMap<>();
+        expected1.put(new GlobalStreamId(spoutId, "default"), 
Grouping.shuffle(new NullStruct()));
+        Map<GlobalStreamId, Grouping> expected2 = new HashMap<>();
+        expected2.put(new GlobalStreamId("bolt1", "s1"), 
Grouping.fields(Collections.emptyList()));
+        assertEquals(expected1, bolt1.get_common().get_inputs());
+        assertEquals(expected2, bolt2.get_common().get_inputs());
+    }
+
+    @Test
+    public void testRepartition() throws Exception {
+        Stream<String> stream = 
streamBuilder.newStream(newSpout(Utils.DEFAULT_STREAM_ID), new 
ValueMapper<>(0));
+        stream.repartition(3).filter(x -> true).repartition(2).filter(x -> 
true).aggregate(new Count<>());
+        StormTopology topology = streamBuilder.build();
+        assertEquals(1, topology.get_spouts_size());
+        SpoutSpec spout = topology.get_spouts().get("spout1");
+        assertEquals(4, topology.get_bolts_size());
+        Bolt bolt1 = topology.get_bolts().get("bolt1");
+        Bolt bolt2 = topology.get_bolts().get("bolt2");
+        Bolt bolt3 = topology.get_bolts().get("bolt3");
+        Bolt bolt4 = topology.get_bolts().get("bolt4");
+        assertEquals(1, spout.get_common().get_parallelism_hint());
+        assertEquals(1, bolt1.get_common().get_parallelism_hint());
+        assertEquals(3, bolt2.get_common().get_parallelism_hint());
+        assertEquals(2, bolt3.get_common().get_parallelism_hint());
+        assertEquals(2, bolt4.get_common().get_parallelism_hint());
+    }
+
+    @Test
+    public void testBranchAndJoin() throws Exception {
+        TopologyContext mockContext = Mockito.mock(TopologyContext.class);
+        OutputCollector mockCollector = Mockito.mock(OutputCollector.class);
+        Stream<Integer> stream = 
streamBuilder.newStream(newSpout(Utils.DEFAULT_STREAM_ID), new 
ValueMapper<>(0));
+        Stream<Integer>[] streams = stream.branch(x -> x % 2 == 0, x -> x % 2 
== 1);
+        PairStream<Integer, Pair<Integer, Integer>> joined = 
streams[0].mapToPair(x -> Pair.of(x, 1)).join(streams[1].mapToPair(x -> 
Pair.of(x, 1)));
+        assertTrue(joined.getNode() instanceof ProcessorNode);
+        StormTopology topology = streamBuilder.build();
+        assertEquals(1, topology.get_bolts_size());
+    }
+
+    private static IRichSpout newSpout(final String os) {
+        return new BaseRichSpout() {
+
+            @Override
+            public void declareOutputFields(OutputFieldsDeclarer declarer) {
+                declarer.declareStream(os, new Fields("value"));
+            }
+
+            @Override
+            public void open(Map conf, TopologyContext context, 
SpoutOutputCollector collector) {
+
+            }
+
+            @Override
+            public void nextTuple() {
+
+            }
+        };
+    }
+
+    private static IRichBolt newBolt() {
+        return new BaseRichBolt() {
+
+            @Override
+            public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+            }
+
+            @Override
+            public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
+
+            }
+
+            @Override
+            public void execute(Tuple input) {
+
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/test/jvm/org/apache/storm/streams/WindowedProcessorBoltTest.java
----------------------------------------------------------------------
diff --git 
a/storm-core/test/jvm/org/apache/storm/streams/WindowedProcessorBoltTest.java 
b/storm-core/test/jvm/org/apache/storm/streams/WindowedProcessorBoltTest.java
new file mode 100644
index 0000000..7428e3f
--- /dev/null
+++ 
b/storm-core/test/jvm/org/apache/storm/streams/WindowedProcessorBoltTest.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams;
+
+import com.google.common.collect.Multimap;
+import org.apache.storm.streams.operations.aggregators.Count;
+import org.apache.storm.streams.processors.AggregateProcessor;
+import org.apache.storm.streams.processors.Processor;
+import org.apache.storm.streams.windowing.TumblingWindows;
+import org.apache.storm.streams.windowing.Window;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.windowing.TupleWindow;
+import org.jgrapht.DirectedGraph;
+import org.jgrapht.graph.DefaultDirectedGraph;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit test for {@link WindowedProcessorBolt}
+ */
+public class WindowedProcessorBoltTest {
+    TopologyContext mockTopologyContext;
+    OutputCollector mockOutputCollector;
+    WindowedProcessorBolt bolt;
+    Tuple mockTuple1;
+    Tuple mockTuple2;
+    Tuple mockTuple3;
+    DirectedGraph<Node, Edge> graph;
+    Multimap<String, ProcessorNode> mockStreamToProcessors;
+
+    @Before
+    public void setUp() throws Exception {
+        mockTopologyContext = Mockito.mock(TopologyContext.class);
+        mockOutputCollector = Mockito.mock(OutputCollector.class);
+        mockTuple1 = Mockito.mock(Tuple.class);
+        mockTuple2 = Mockito.mock(Tuple.class);
+        mockTuple3 = Mockito.mock(Tuple.class);
+        setUpMockTuples(mockTuple1, mockTuple2, mockTuple3);
+        mockStreamToProcessors = Mockito.mock(Multimap.class);
+    }
+
+    @Test
+    public void testEmit() throws Exception {
+        Window<?, ?> window = TumblingWindows.of(BaseWindowedBolt.Count.of(2));
+        setUpWindowedProcessorBolt(new AggregateProcessor<>(new Count<>()), 
window);
+        bolt.execute(getMockTupleWindow(mockTuple1, mockTuple2, mockTuple3));
+        ArgumentCaptor<Values> values = ArgumentCaptor.forClass(Values.class);
+        ArgumentCaptor<String> os = ArgumentCaptor.forClass(String.class);
+        Mockito.verify(mockOutputCollector, 
Mockito.times(2)).emit(os.capture(), values.capture());
+        assertEquals("outputstream", os.getAllValues().get(0));
+        assertEquals(new Values(3L), values.getAllValues().get(0));
+        assertEquals("outputstream", os.getAllValues().get(1));
+        assertEquals(new Values(WindowNode.PUNCTUATION), 
values.getAllValues().get(1));
+    }
+
+    private void setUpWindowedProcessorBolt(Processor<?> processor, Window<?, 
?> window) {
+        ProcessorNode node = new ProcessorNode(processor, "outputstream", new 
Fields("value"));
+        node.setWindowed(true);
+        
Mockito.when(mockStreamToProcessors.get(Mockito.anyString())).thenReturn(Collections.singletonList(node));
+        
Mockito.when(mockStreamToProcessors.keySet()).thenReturn(Collections.singleton("inputstream"));
+        graph = new DefaultDirectedGraph<>(new StreamsEdgeFactory());
+        graph.addVertex(node);
+        bolt = new WindowedProcessorBolt("bolt1", graph, 
Collections.singletonList(node), window);
+        bolt.setStreamToInitialProcessors(mockStreamToProcessors);
+        bolt.prepare(new HashMap<>(), mockTopologyContext, 
mockOutputCollector);
+    }
+
+    private void setUpMockTuples(Tuple... tuples) {
+        for (Tuple tuple : tuples) {
+            Mockito.when(tuple.size()).thenReturn(1);
+            Mockito.when(tuple.getValue(0)).thenReturn(100);
+            Mockito.when(tuple.getSourceComponent()).thenReturn("bolt0");
+            Mockito.when(tuple.getSourceStreamId()).thenReturn("inputstream");
+        }
+    }
+
+    private TupleWindow getMockTupleWindow(Tuple... tuples) {
+        TupleWindow tupleWindow = Mockito.mock(TupleWindow.class);
+        Mockito.when(tupleWindow.get()).thenReturn(Arrays.asList(tuples));
+        return tupleWindow;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/test/jvm/org/apache/storm/windowing/WindowManagerTest.java
----------------------------------------------------------------------
diff --git 
a/storm-core/test/jvm/org/apache/storm/windowing/WindowManagerTest.java 
b/storm-core/test/jvm/org/apache/storm/windowing/WindowManagerTest.java
index 6645566..6c170c6 100644
--- a/storm-core/test/jvm/org/apache/storm/windowing/WindowManagerTest.java
+++ b/storm-core/test/jvm/org/apache/storm/windowing/WindowManagerTest.java
@@ -60,7 +60,7 @@ public class WindowManagerTest {
         }
 
         @Override
-        public void onActivation(List<Integer> events, List<Integer> 
newEvents, List<Integer> expired) {
+        public void onActivation(List<Integer> events, List<Integer> 
newEvents, List<Integer> expired, Long timestamp) {
             onActivationEvents = events;
             allOnActivationEvents.add(events);
             onActivationNewEvents = newEvents;

Reply via email to