Repository: storm
Updated Branches:
  refs/heads/1.x-branch e99961311 -> 81b398d6b


STORM-1873 Configurable behaviour for late tuples


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c49b6f7d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c49b6f7d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c49b6f7d

Branch: refs/heads/1.x-branch
Commit: c49b6f7d94d05b2872cd0edd6f1becef1ac1aae5
Parents: e999613
Author: Balazs Kossovics <[email protected]>
Authored: Thu Jun 2 17:43:10 2016 +0200
Committer: Arun Mahadevan <[email protected]>
Committed: Thu Jun 9 11:25:04 2016 +0530

----------------------------------------------------------------------
 docs/Windowing.md                               | 24 +++++-
 .../clj/org/apache/storm/daemon/executor.clj    |  1 +
 storm-core/src/jvm/org/apache/storm/Config.java |  8 ++
 .../storm/topology/WindowedBoltExecutor.java    | 25 +++++-
 .../topology/base/BaseStatefulWindowedBolt.java |  8 ++
 .../storm/topology/base/BaseWindowedBolt.java   | 14 ++++
 .../topology/WindowedBoltExecutorTest.java      | 87 +++++++++++++++++++-
 7 files changed, 161 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c49b6f7d/docs/Windowing.md
----------------------------------------------------------------------
diff --git a/docs/Windowing.md b/docs/Windowing.md
index 9f5869f..82212ea 100644
--- a/docs/Windowing.md
+++ b/docs/Windowing.md
@@ -149,10 +149,6 @@ The value for the above `fieldName` will be looked up from 
the incoming tuple an
 If the field is not present in the tuple an exception will be thrown. Along 
with the timestamp field name, a time lag parameter 
 can also be specified which indicates the max time limit for tuples with out 
of order timestamps. 
 
-E.g. If the lag is 5 secs and a tuple `t1` arrived with timestamp `06:00:05` 
no tuples may arrive with tuple timestamp earlier than `06:00:00`. If a tuple
-arrives with timestamp 05:59:59 after `t1` and the window has moved past `t1`, 
it will be treated as a late tuple and not processed. Currently the late
- tuples are just logged in the worker log files at INFO level.
-
 ```java
 /**
 * Specify the maximum time lag of the tuple timestamp in milliseconds. It 
means that the tuple timestamps
@@ -163,6 +159,26 @@ arrives with timestamp 05:59:59 after `t1` and the window 
has moved past `t1`, i
 public BaseWindowedBolt withLag(Duration duration)
 ```
 
+E.g. If the lag is 5 secs and a tuple `t1` arrived with timestamp `06:00:05` 
no tuples may arrive with tuple timestamp earlier than `06:00:00`. If a tuple
+arrives with timestamp 05:59:59 after `t1` and the window has moved past `t1`, 
it will be treated as a late tuple. Late tuples are not processed by default,
+just logged in the worker log files at INFO level.
+
+```java
+/**
+ * Specify a stream id on which late tuples are going to be emitted. They are 
going to be accessible via the
+ * {@link org.apache.storm.topology.WindowedBoltExecutor#LATE_TUPLE_FIELD} 
field.
+ * It must be defined on a per-component basis, and in conjunction with the
+ * {@link BaseWindowedBolt#withTimestampField}, otherwise {@link 
IllegalArgumentException} will be thrown.
+ *
+ * @param streamId the name of the stream used to emit late tuples on
+ */
+public BaseWindowedBolt withLateTupleStream(String streamId)
+
+```
+This behaviour can be changed by specifying the above `streamId`. In this case 
late tuples are going to be emitted on the specified stream and accessible
+via the field `WindowedBoltExecutor.LATE_TUPLE_FIELD`.
+
+
 ### Watermarks
 For processing tuples with timestamp field, storm internally computes 
watermarks based on the incoming tuple timestamp. Watermark is 
 the minimum of the latest tuple timestamps (minus the lag) across all the 
input streams. At a higher level this is similar to the watermark concept

http://git-wip-us.apache.org/repos/asf/storm/blob/c49b6f7d/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj 
b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 8c835d5..d50e494 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -173,6 +173,7 @@
                         TOPOLOGY-BOLTS-SLIDING-INTERVAL-COUNT
                         TOPOLOGY-BOLTS-SLIDING-INTERVAL-DURATION-MS
                         TOPOLOGY-BOLTS-TUPLE-TIMESTAMP-FIELD-NAME
+                        TOPOLOGY-BOLTS-LATE-TUPLE-STREAM
                         TOPOLOGY-BOLTS-TUPLE-TIMESTAMP-MAX-LAG-MS
                         TOPOLOGY-BOLTS-MESSAGE-ID-FIELD-NAME
                         TOPOLOGY-STATE-PROVIDER

http://git-wip-us.apache.org/repos/asf/storm/blob/c49b6f7d/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java 
b/storm-core/src/jvm/org/apache/storm/Config.java
index 21aa010..d4a5125 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -1874,6 +1874,14 @@ public class Config extends HashMap<String, Object> {
     @isString
     public static final String TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME = 
"topology.bolts.tuple.timestamp.field.name";
 
+    /**
+     * Bolt-specific configuration for windowed bolts to specify the name of 
the stream on which late tuples are
+     * going to be emitted. This configuration should only be used from the 
BaseWindowedBolt.withLateTupleStream builder
+     * method, and not as global parameter, otherwise IllegalArgumentException 
is going to be thrown.
+     */
+    @isString
+    public static final String TOPOLOGY_BOLTS_LATE_TUPLE_STREAM = 
"topology.bolts.late.tuple.stream";
+
     /*
      * Bolt-specific configuration for windowed bolts to specify the maximum 
time lag of the tuple timestamp
      * in milliseconds. It means that the tuple timestamps cannot be out of 
order by more than this amount.

http://git-wip-us.apache.org/repos/asf/storm/blob/c49b6f7d/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java 
b/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
index 9439ef2..fe0f4da 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
@@ -23,7 +23,9 @@ import org.apache.storm.spout.CheckpointSpout;
 import org.apache.storm.task.IOutputCollector;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
 import org.apache.storm.windowing.CountEvictionPolicy;
 import org.apache.storm.windowing.CountTriggerPolicy;
 import org.apache.storm.windowing.EvictionPolicy;
@@ -57,12 +59,14 @@ public class WindowedBoltExecutor implements IRichBolt {
     private static final Logger LOG = 
LoggerFactory.getLogger(WindowedBoltExecutor.class);
     private static final int DEFAULT_WATERMARK_EVENT_INTERVAL_MS = 1000; // 1s
     private static final int DEFAULT_MAX_LAG_MS = 0; // no lag
+    public static final String LATE_TUPLE_FIELD = "late_tuple";
     private final IWindowedBolt bolt;
     private transient WindowedOutputCollector windowedOutputCollector;
     private transient WindowLifecycleListener<Tuple> listener;
     private transient WindowManager<Tuple> windowManager;
     private transient int maxLagMs;
     private transient String tupleTsFieldName;
+    private transient String lateTupleStream;
     private transient TriggerPolicy<Tuple> triggerPolicy;
     private transient EvictionPolicy<Tuple> evictionPolicy;
     // package level for unit tests
@@ -163,6 +167,13 @@ public class WindowedBoltExecutor implements IRichBolt {
         // tuple ts
         if 
(stormConf.containsKey(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME)) {
             tupleTsFieldName = (String) 
stormConf.get(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME);
+            // late tuple stream
+            lateTupleStream = (String) 
stormConf.get(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM);
+            if (lateTupleStream != null) {
+                if (!context.getThisStreams().contains(lateTupleStream)) {
+                    throw new IllegalArgumentException("Stream for late tuples 
must be defined with the builder method withLateTupleStream");
+                }
+            }
             // max lag
             if 
(stormConf.containsKey(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS)) {
                 maxLagMs = ((Number) 
stormConf.get(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS)).intValue();
@@ -178,6 +189,10 @@ public class WindowedBoltExecutor implements IRichBolt {
             }
             waterMarkEventGenerator = new WaterMarkEventGenerator<>(manager, 
watermarkInterval,
                                                                     maxLagMs, 
getComponentStreams(context));
+        } else {
+            if 
(stormConf.containsKey(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM)) {
+                throw new IllegalArgumentException("Late tuple stream can be 
defined only when specifying a timestamp field");
+            }
         }
         // validate
         validate(stormConf, windowLengthCount, windowLengthDuration,
@@ -268,8 +283,12 @@ public class WindowedBoltExecutor implements IRichBolt {
             if (waterMarkEventGenerator.track(input.getSourceGlobalStreamId(), 
ts)) {
                 windowManager.add(input, ts);
             } else {
+                if (lateTupleStream != null) {
+                    windowedOutputCollector.emit(lateTupleStream, input, new 
Values(input));
+                } else {
+                    LOG.info("Received a late tuple {} with ts {}. This will 
not be processed.", input, ts);
+                }
                 windowedOutputCollector.ack(input);
-                LOG.info("Received a late tuple {} with ts {}. This will not 
processed.", input, ts);
             }
         } else {
             windowManager.add(input);
@@ -284,6 +303,10 @@ public class WindowedBoltExecutor implements IRichBolt {
 
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        String lateTupleStream = (String) 
getComponentConfiguration().get(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM);
+        if (lateTupleStream != null) {
+            declarer.declareStream(lateTupleStream, new 
Fields(LATE_TUPLE_FIELD));
+        }
         bolt.declareOutputFields(declarer);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c49b6f7d/storm-core/src/jvm/org/apache/storm/topology/base/BaseStatefulWindowedBolt.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/topology/base/BaseStatefulWindowedBolt.java
 
b/storm-core/src/jvm/org/apache/storm/topology/base/BaseStatefulWindowedBolt.java
index 850904b..0c35b9d 100644
--- 
a/storm-core/src/jvm/org/apache/storm/topology/base/BaseStatefulWindowedBolt.java
+++ 
b/storm-core/src/jvm/org/apache/storm/topology/base/BaseStatefulWindowedBolt.java
@@ -103,6 +103,14 @@ public abstract class BaseStatefulWindowedBolt<T extends 
State> extends BaseWind
         return this;
     }
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public BaseStatefulWindowedBolt<T> withLateTupleStream(String streamName) {
+        super.withLateTupleStream(streamName);
+        return this;
+    }
 
     /**
      * {@inheritDoc}

http://git-wip-us.apache.org/repos/asf/storm/blob/c49b6f7d/storm-core/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java 
b/storm-core/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java
index e1725dd..6a0a93c 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java
@@ -255,6 +255,20 @@ public abstract class BaseWindowedBolt implements 
IWindowedBolt {
     }
 
     /**
+     * Specify a stream id on which late tuples are going to be emitted. They 
are going to be accessible via the
+     * {@link org.apache.storm.topology.WindowedBoltExecutor#LATE_TUPLE_FIELD} 
field.
+     * It must be defined on a per-component basis, and in conjunction with the
+     * {@link BaseWindowedBolt#withTimestampField}, otherwise {@link 
IllegalArgumentException} will be thrown.
+     *
+     * @param streamId the name of the stream used to emit late tuples on
+     */
+    public BaseWindowedBolt withLateTupleStream(String streamId) {
+        windowConfiguration.put(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM, 
streamId);
+        return this;
+    }
+
+
+    /**
      * Specify the maximum time lag of the tuple timestamp in milliseconds. It 
means that the tuple timestamps
      * cannot be out of order by more than this amount.
      *

http://git-wip-us.apache.org/repos/asf/storm/blob/c49b6f7d/storm-core/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/storm-core/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java 
b/storm-core/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java
index 0ba1e24..f6822ce 100644
--- 
a/storm-core/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java
+++ 
b/storm-core/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java
@@ -28,18 +28,21 @@ import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.TupleImpl;
 import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Time;
 import org.apache.storm.windowing.TupleWindow;
-import org.apache.storm.windowing.WaterMarkEvent;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
+import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.*;
 
 /**
@@ -87,6 +90,7 @@ public class WindowedBoltExecutorTest {
 
     private TopologyContext getTopologyContext() {
         TopologyContext context = Mockito.mock(TopologyContext.class);
+
         Map<GlobalStreamId, Grouping> sources = Collections.singletonMap(
                 new GlobalStreamId("s1", "default"),
                 null
@@ -139,4 +143,85 @@ public class WindowedBoltExecutorTest {
         assertArrayEquals(new long[]{618, 626},
                           new long[]{(long) third.get().get(0).getValue(0), 
(long)third.get().get(1).getValue(0)});
     }
+
+    @Test
+    public void testPrepareLateTupleStreamWithoutTs() throws Exception {
+        Map<String, Object> conf = new HashMap<>();
+        conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 100000);
+        conf.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS, 20);
+        conf.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS, 10);
+        conf.put(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM, "$late");
+        conf.put(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS, 5);
+        conf.put(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, 10);
+
+        testWindowedBolt = new TestWindowedBolt();
+        executor = new WindowedBoltExecutor(testWindowedBolt);
+        TopologyContext context = getTopologyContext();
+        // emulate the call of withLateTupleStream method
+        Mockito.when(context.getThisStreams()).thenReturn(new 
HashSet<>(Arrays.asList("default", "$late")));
+        try {
+            executor.prepare(conf, context, getOutputCollector());
+            fail();
+        } catch (IllegalArgumentException e) {
+            assertThat(e.getMessage(), is("Late tuple stream can be defined 
only when specifying a timestamp field"));
+        }
+    }
+
+
+    @Test
+    public void testPrepareLateTUpleStreamWithoutBuilder() throws Exception {
+        Map<String, Object> conf = new HashMap<>();
+        conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 100000);
+        conf.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS, 20);
+        conf.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS, 10);
+        conf.put(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME, "ts");
+        conf.put(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM, "$late");
+        conf.put(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS, 5);
+        conf.put(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, 10);
+
+        testWindowedBolt = new TestWindowedBolt();
+        executor = new WindowedBoltExecutor(testWindowedBolt);
+        TopologyContext context = getTopologyContext();
+        try {
+            executor.prepare(conf, context, getOutputCollector());
+            fail();
+        } catch (IllegalArgumentException e) {
+            assertThat(e.getMessage(), is("Stream for late tuples must be 
defined with the builder method withLateTupleStream"));
+        }
+    }
+
+
+    @Test
+    public void testExecuteWithLateTupleStream() throws Exception {
+        testWindowedBolt = new TestWindowedBolt();
+        executor = new WindowedBoltExecutor(testWindowedBolt);
+        TopologyContext context = getTopologyContext();
+        Mockito.when(context.getThisStreams()).thenReturn(new 
HashSet<>(Arrays.asList("default", "$late")));
+
+        OutputCollector outputCollector = Mockito.mock(OutputCollector.class);
+        Map<String, Object> conf = new HashMap<>();
+        conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 100000);
+        conf.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS, 20);
+        conf.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS, 10);
+        conf.put(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME, "ts");
+        conf.put(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM, "$late");
+        conf.put(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS, 5);
+        conf.put(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, 10);
+        executor.prepare(conf, context, outputCollector);
+
+        long[] timstamps = {603, 605, 607, 618, 626, 636, 600};
+        List<Tuple> tuples = new ArrayList<>(timstamps.length);
+
+        executor.waterMarkEventGenerator.run();
+        for (long ts : timstamps) {
+            Tuple tuple = getTuple("s1", new Fields("ts"), new Values(ts));
+            tuples.add(tuple);
+            executor.execute(tuple);
+            Time.sleep(10);
+        }
+
+        System.out.println(testWindowedBolt.tupleWindows);
+        Tuple tuple = tuples.get(tuples.size() - 1);
+        Mockito.verify(outputCollector).emit("$late", Arrays.asList(tuple), 
new Values(tuple));
+    }
 }

Reply via email to