[FLINK-3401] [streaming] [api breaking] AscendingTimestampExtractor only logs 
violations of ascending timestamp order.

The user can also explicitly set an 'IgnoringHandler' or a 'FailingHandler', 
which do nothing on violations,
respectively fail hard.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e08d7a6f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e08d7a6f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e08d7a6f

Branch: refs/heads/master
Commit: e08d7a6f30f8e3b845ff33d4266fbd4fad4bec63
Parents: 74c2b80
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Feb 15 18:41:15 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Feb 17 11:35:36 2016 +0100

----------------------------------------------------------------------
 .../streaming/examples/join/WindowJoin.java     |   4 +-
 .../examples/windowing/TopSpeedWindowing.java   |   2 +-
 .../functions/AscendingTimestampExtractor.java  | 124 +++++++++++++++----
 .../api/complex/ComplexIntegrationTest.java     |   2 +-
 .../AscendingTimestampExtractorTest.java        | 100 +++++++++++++++
 .../streaming/timestamp/TimestampITCase.java    |   2 +-
 .../flink/streaming/api/scala/DataStream.scala  |   2 +-
 7 files changed, 208 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e08d7a6f/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
index 2afccc8..7c64482 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
@@ -74,8 +74,8 @@ public class WindowJoin {
                DataStream<Tuple3<Long, String, Integer>> salaries = input.f1;
 
                // extract the timestamps
-               grades = grades.assignTimestamps(new MyTimestampExtractor());
-               salaries = salaries.assignTimestamps(new 
MyTimestampExtractor());
+               grades = grades.assignTimestampsAndWatermarks(new 
MyTimestampExtractor());
+               salaries = salaries.assignTimestampsAndWatermarks(new 
MyTimestampExtractor());
 
                // apply a temporal join over the two stream based on the names 
over one
                // second windows

http://git-wip-us.apache.org/repos/asf/flink/blob/e08d7a6f/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
index b0d1462..9104416 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
@@ -71,7 +71,7 @@ public class TopSpeedWindowing {
                int evictionSec = 10;
                double triggerMeters = 50;
                DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds = 
carData
-                               .assignTimestamps(new CarTimestamp())
+                               .assignTimestampsAndWatermarks(new 
CarTimestamp())
                                .keyBy(0)
                                .window(GlobalWindows.create())
                                .evictor(TimeEvictor.of(Time.of(evictionSec, 
TimeUnit.SECONDS)))

http://git-wip-us.apache.org/repos/asf/flink/blob/e08d7a6f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractor.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractor.java
index 97f0095..60216b6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractor.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,48 +15,128 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.api.functions;
 
 import org.apache.flink.annotation.PublicEvolving;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Objects.requireNonNull;
+
 /**
- * Interface for user functions that extract timestamps from elements. The 
extracting timestamps
- * must be monotonically increasing.
+ * A timestamp assigner and watermark generator for streams where timestamps 
are monotonously
+ * ascending. In this case, the local watermarks for the streams are easy to 
generate, because
+ * they strictly follow the timestamps.
  *
  * @param <T> The type of the elements that this function can extract 
timestamps from
  */
 @PublicEvolving
-public abstract class AscendingTimestampExtractor<T> implements 
TimestampExtractor<T> {
-
-       long currentTimestamp = 0;
+public abstract class AscendingTimestampExtractor<T> implements 
AssignerWithPeriodicWatermarks<T> {
+       
+       private static final long serialVersionUID = 1L;
+       
+       /** The current timestamp */
+       private long currentTimestamp = 0;
 
+       /** Handler that is called when timestamp monotony is violated */
+       private MonotonyViolationHandler violationHandler = new 
LoggingHandler();
+       
        /**
-        * Extracts a timestamp from an element. The timestamp must be 
monotonically increasing.
+        * Extracts the timestamp from the given element. The timestamp must be 
monotonically increasing.
         *
         * @param element The element that the timestamp is extracted from.
-        * @param currentTimestamp The current internal timestamp of the 
element.
+        * @param previousElementTimestamp The current internal timestamp of 
the element.
+        * 
         * @return The new timestamp.
         */
-       public abstract long extractAscendingTimestamp(T element, long 
currentTimestamp);
+       public abstract long extractAscendingTimestamp(T element, long 
previousElementTimestamp);
 
-       @Override
-       public final long extractTimestamp(T element, long currentTimestamp) {
-               long newTimestamp = extractAscendingTimestamp(element, 
currentTimestamp);
-               if (newTimestamp < this.currentTimestamp) {
-                       throw new RuntimeException("Timestamp is lower than 
previously extracted timestamp. " +
-                                       "You should implement a custom 
TimestampExtractor.");
-               }
-               this.currentTimestamp = newTimestamp;
-               return this.currentTimestamp;
+       /**
+        * Sets the handler for violations to the ascending timestamp order.
+        * 
+        * @param handler The violation handler to use.
+        * @return This extractor.
+        */
+       public AscendingTimestampExtractor<T> 
withViolationHandler(MonotonyViolationHandler handler) {
+               this.violationHandler = requireNonNull(handler);
+               return this;
        }
-
+       
+       // 
------------------------------------------------------------------------
+       
        @Override
-       public final long extractWatermark(T element, long currentTimestamp) {
-               return Long.MIN_VALUE;
+       public final long extractTimestamp(T element, long 
elementPrevTimestamp) {
+               final long newTimestamp = extractAscendingTimestamp(element, 
elementPrevTimestamp);
+               if (newTimestamp >= this.currentTimestamp) {
+                       this.currentTimestamp = newTimestamp;
+                       return newTimestamp;
+               } else {
+                       violationHandler.handleViolation(newTimestamp, 
this.currentTimestamp);
+                       return newTimestamp;
+               }
        }
 
        @Override
        public final long getCurrentWatermark() {
                return currentTimestamp - 1;
        }
+
+       // 
------------------------------------------------------------------------
+       //  Handling violations of monotonous timestamps
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Interface for handlers that handle violations of the monotonous 
ascending timestamps
+        * property.
+        */
+       public interface MonotonyViolationHandler extends java.io.Serializable {
+
+               /**
+                * Called when the property of monotonously ascending 
timestamps is violated, i.e.,
+                * when {@code elementTimestamp < lastTimestamp}.
+                * 
+                * @param elementTimestamp The timestamp of the current element.
+                * @param lastTimestamp The last timestamp.
+                */
+               void handleViolation(long elementTimestamp, long lastTimestamp);
+       }
+
+       /**
+        * Handler that does nothing when timestamp monotony is violated.
+        */
+       public static final class IgnoringHandler implements 
MonotonyViolationHandler {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void handleViolation(long elementTimestamp, long 
lastTimestamp) {}
+       }
+
+       /**
+        * Handler that fails the program when timestamp monotony is violated.
+        */
+       public static final class FailingHandler implements 
MonotonyViolationHandler {
+               private static final long serialVersionUID = 1L;
+               
+               @Override
+               public void handleViolation(long elementTimestamp, long 
lastTimestamp) {
+                       throw new RuntimeException("Ascending timestamps 
condition violated. Element timestamp "
+                                       + elementTimestamp + " is smaller than 
last timestamp " + lastTimestamp);
+               }
+       }
+
+       /**
+        * Handler that only logs violations of timestamp monotony, on WARN log 
level.
+        */
+       public static final class LoggingHandler implements 
MonotonyViolationHandler {
+               private static final long serialVersionUID = 1L;
+
+               private static final Logger LOG = 
LoggerFactory.getLogger(AscendingTimestampExtractor.class);
+               
+               @Override
+               public void handleViolation(long elementTimestamp, long 
lastTimestamp) {
+                       LOG.warn("Timestamp monotony violated: {} < {}", 
elementTimestamp, lastTimestamp);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e08d7a6f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
index 8c29a15..41df4e6 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
@@ -205,7 +205,7 @@ public class ComplexIntegrationTest extends 
StreamingMultipleProgramsTestBase {
                DataStream<OuterPojo> sourceStream22 = env.addSource(new 
PojoSource());
 
                sourceStream21
-                               .assignTimestamps(new MyTimestampExtractor())
+                               .assignTimestampsAndWatermarks(new 
MyTimestampExtractor())
                                .keyBy(2, 2)
                                .timeWindow(Time.of(10, TimeUnit.MILLISECONDS), 
Time.of(4, TimeUnit.MILLISECONDS))
                                .maxBy(3)

http://git-wip-us.apache.org/repos/asf/flink/blob/e08d7a6f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractorTest.java
new file mode 100644
index 0000000..4f1eeb9
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractorTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class AscendingTimestampExtractorTest {
+       
+       @Test
+       public void testWithFailingHandler() {
+               AscendingTimestampExtractor<Long> extractor = new 
LongExtractor()
+                               .withViolationHandler(new 
AscendingTimestampExtractor.FailingHandler());
+               
+               runValidTests(extractor);
+               try {
+                       runInvalidTest(extractor);
+                       fail("should fail with an exception");
+               } catch (Exception ignored) {}
+       }
+
+       @Test
+       public void testWithIgnoringHandler() {
+               AscendingTimestampExtractor<Long> extractor = new 
LongExtractor()
+                               .withViolationHandler(new 
AscendingTimestampExtractor.IgnoringHandler());
+
+               runValidTests(extractor);
+               runInvalidTest(extractor);
+       }
+
+       @Test
+       public void testWithLoggingHandler() {
+               AscendingTimestampExtractor<Long> extractor = new 
LongExtractor()
+                               .withViolationHandler(new 
AscendingTimestampExtractor.LoggingHandler());
+
+               runValidTests(extractor);
+               runInvalidTest(extractor);
+       }
+
+       @Test
+       public void testWithDefaultHandler() {
+               AscendingTimestampExtractor<Long> extractor = new 
LongExtractor();
+
+               runValidTests(extractor);
+               runInvalidTest(extractor);
+       }
+       
+       // 
------------------------------------------------------------------------
+       
+       private void runValidTests(AscendingTimestampExtractor<Long> extractor) 
{
+               assertEquals(13L, extractor.extractTimestamp(13L, -1L));
+               assertEquals(13L, extractor.extractTimestamp(13L, 0L));
+               assertEquals(14L, extractor.extractTimestamp(14L, 0L));
+               assertEquals(20L, extractor.extractTimestamp(20L, 0L));
+               assertEquals(20L, extractor.extractTimestamp(20L, 0L));
+               assertEquals(20L, extractor.extractTimestamp(20L, 0L));
+               assertEquals(500L, extractor.extractTimestamp(500L, 0L));
+               
+               assertEquals(Long.MAX_VALUE - 1, 
extractor.extractTimestamp(Long.MAX_VALUE - 1, 99999L));
+               
+               
+       }
+       
+       private void runInvalidTest(AscendingTimestampExtractor<Long> 
extractor) {
+               assertEquals(1000L, extractor.extractTimestamp(1000L, 100));
+               assertEquals(1000L, extractor.extractTimestamp(1000L, 100));
+               
+               // violation
+               assertEquals(999L, extractor.extractTimestamp(999L, 100));
+       }
+
+       // 
------------------------------------------------------------------------
+       
+       private static class LongExtractor extends 
AscendingTimestampExtractor<Long> {
+               private static final long serialVersionUID = 1L;
+               
+               @Override
+               public long extractAscendingTimestamp(Long element, long 
currentTimestamp) {
+                       return element;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e08d7a6f/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
index 677636a..1416104 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
@@ -260,7 +260,7 @@ public class TimestampITCase {
                        public void cancel() {}
                });
 
-               DataStream<Integer> extractOp = source1.assignTimestamps(
+               DataStream<Integer> extractOp = 
source1.assignTimestampsAndWatermarks(
                                new AscendingTimestampExtractor<Integer>() {
                                        @Override
                                        public long 
extractAscendingTimestamp(Integer element, long currentTimestamp) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e08d7a6f/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 4b019f9..0f6ec7d 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -726,7 +726,7 @@ class DataStream[T](stream: JavaStream[T]) {
         cleanExtractor(element)
       }
     }
-    stream.assignTimestamps(extractorFunction)
+    stream.assignTimestampsAndWatermarks(extractorFunction)
   }
 
   /**

Reply via email to