[FLINK-3401] [streaming] [api breaking] AscendingTimestampExtractor only logs violations of ascending timestamp order.
The user can also explicitly set an 'IgnoringHandler' or a 'FailingHandler', which do nothing on violations, respectively fail hard. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e08d7a6f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e08d7a6f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e08d7a6f Branch: refs/heads/master Commit: e08d7a6f30f8e3b845ff33d4266fbd4fad4bec63 Parents: 74c2b80 Author: Stephan Ewen <se...@apache.org> Authored: Mon Feb 15 18:41:15 2016 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Wed Feb 17 11:35:36 2016 +0100 ---------------------------------------------------------------------- .../streaming/examples/join/WindowJoin.java | 4 +- .../examples/windowing/TopSpeedWindowing.java | 2 +- .../functions/AscendingTimestampExtractor.java | 124 +++++++++++++++---- .../api/complex/ComplexIntegrationTest.java | 2 +- .../AscendingTimestampExtractorTest.java | 100 +++++++++++++++ .../streaming/timestamp/TimestampITCase.java | 2 +- .../flink/streaming/api/scala/DataStream.scala | 2 +- 7 files changed, 208 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e08d7a6f/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java index 2afccc8..7c64482 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java @@ -74,8 +74,8 @@ public class WindowJoin { DataStream<Tuple3<Long, String, Integer>> salaries = input.f1; // extract the timestamps - grades = grades.assignTimestamps(new MyTimestampExtractor()); - salaries = salaries.assignTimestamps(new MyTimestampExtractor()); + grades = grades.assignTimestampsAndWatermarks(new MyTimestampExtractor()); + salaries = salaries.assignTimestampsAndWatermarks(new MyTimestampExtractor()); // apply a temporal join over the two stream based on the names over one // second windows http://git-wip-us.apache.org/repos/asf/flink/blob/e08d7a6f/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java index b0d1462..9104416 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java @@ -71,7 +71,7 @@ public class TopSpeedWindowing { int evictionSec = 10; double triggerMeters = 50; DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds = carData - .assignTimestamps(new CarTimestamp()) + .assignTimestampsAndWatermarks(new CarTimestamp()) .keyBy(0) .window(GlobalWindows.create()) .evictor(TimeEvictor.of(Time.of(evictionSec, TimeUnit.SECONDS))) http://git-wip-us.apache.org/repos/asf/flink/blob/e08d7a6f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractor.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractor.java index 97f0095..60216b6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractor.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,48 +15,128 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.api.functions; import org.apache.flink.annotation.PublicEvolving; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.util.Objects.requireNonNull; + /** - * Interface for user functions that extract timestamps from elements. The extracting timestamps - * must be monotonically increasing. + * A timestamp assigner and watermark generator for streams where timestamps are monotonously + * ascending. In this case, the local watermarks for the streams are easy to generate, because + * they strictly follow the timestamps. * * @param <T> The type of the elements that this function can extract timestamps from */ @PublicEvolving -public abstract class AscendingTimestampExtractor<T> implements TimestampExtractor<T> { - - long currentTimestamp = 0; +public abstract class AscendingTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T> { + + private static final long serialVersionUID = 1L; + + /** The current timestamp */ + private long currentTimestamp = 0; + /** Handler that is called when timestamp monotony is violated */ + private MonotonyViolationHandler violationHandler = new LoggingHandler(); + /** - * Extracts a timestamp from an element. The timestamp must be monotonically increasing. + * Extracts the timestamp from the given element. The timestamp must be monotonically increasing. * * @param element The element that the timestamp is extracted from. - * @param currentTimestamp The current internal timestamp of the element. + * @param previousElementTimestamp The current internal timestamp of the element. + * * @return The new timestamp. */ - public abstract long extractAscendingTimestamp(T element, long currentTimestamp); + public abstract long extractAscendingTimestamp(T element, long previousElementTimestamp); - @Override - public final long extractTimestamp(T element, long currentTimestamp) { - long newTimestamp = extractAscendingTimestamp(element, currentTimestamp); - if (newTimestamp < this.currentTimestamp) { - throw new RuntimeException("Timestamp is lower than previously extracted timestamp. " + - "You should implement a custom TimestampExtractor."); - } - this.currentTimestamp = newTimestamp; - return this.currentTimestamp; + /** + * Sets the handler for violations to the ascending timestamp order. + * + * @param handler The violation handler to use. + * @return This extractor. + */ + public AscendingTimestampExtractor<T> withViolationHandler(MonotonyViolationHandler handler) { + this.violationHandler = requireNonNull(handler); + return this; } - + + // ------------------------------------------------------------------------ + @Override - public final long extractWatermark(T element, long currentTimestamp) { - return Long.MIN_VALUE; + public final long extractTimestamp(T element, long elementPrevTimestamp) { + final long newTimestamp = extractAscendingTimestamp(element, elementPrevTimestamp); + if (newTimestamp >= this.currentTimestamp) { + this.currentTimestamp = newTimestamp; + return newTimestamp; + } else { + violationHandler.handleViolation(newTimestamp, this.currentTimestamp); + return newTimestamp; + } } @Override public final long getCurrentWatermark() { return currentTimestamp - 1; } + + // ------------------------------------------------------------------------ + // Handling violations of monotonous timestamps + // ------------------------------------------------------------------------ + + /** + * Interface for handlers that handle violations of the monotonous ascending timestamps + * property. + */ + public interface MonotonyViolationHandler extends java.io.Serializable { + + /** + * Called when the property of monotonously ascending timestamps is violated, i.e., + * when {@code elementTimestamp < lastTimestamp}. + * + * @param elementTimestamp The timestamp of the current element. + * @param lastTimestamp The last timestamp. + */ + void handleViolation(long elementTimestamp, long lastTimestamp); + } + + /** + * Handler that does nothing when timestamp monotony is violated. + */ + public static final class IgnoringHandler implements MonotonyViolationHandler { + private static final long serialVersionUID = 1L; + + @Override + public void handleViolation(long elementTimestamp, long lastTimestamp) {} + } + + /** + * Handler that fails the program when timestamp monotony is violated. + */ + public static final class FailingHandler implements MonotonyViolationHandler { + private static final long serialVersionUID = 1L; + + @Override + public void handleViolation(long elementTimestamp, long lastTimestamp) { + throw new RuntimeException("Ascending timestamps condition violated. Element timestamp " + + elementTimestamp + " is smaller than last timestamp " + lastTimestamp); + } + } + + /** + * Handler that only logs violations of timestamp monotony, on WARN log level. + */ + public static final class LoggingHandler implements MonotonyViolationHandler { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(AscendingTimestampExtractor.class); + + @Override + public void handleViolation(long elementTimestamp, long lastTimestamp) { + LOG.warn("Timestamp monotony violated: {} < {}", elementTimestamp, lastTimestamp); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/e08d7a6f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java index 8c29a15..41df4e6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java @@ -205,7 +205,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase { DataStream<OuterPojo> sourceStream22 = env.addSource(new PojoSource()); sourceStream21 - .assignTimestamps(new MyTimestampExtractor()) + .assignTimestampsAndWatermarks(new MyTimestampExtractor()) .keyBy(2, 2) .timeWindow(Time.of(10, TimeUnit.MILLISECONDS), Time.of(4, TimeUnit.MILLISECONDS)) .maxBy(3) http://git-wip-us.apache.org/repos/asf/flink/blob/e08d7a6f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractorTest.java new file mode 100644 index 0000000..4f1eeb9 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractorTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class AscendingTimestampExtractorTest { + + @Test + public void testWithFailingHandler() { + AscendingTimestampExtractor<Long> extractor = new LongExtractor() + .withViolationHandler(new AscendingTimestampExtractor.FailingHandler()); + + runValidTests(extractor); + try { + runInvalidTest(extractor); + fail("should fail with an exception"); + } catch (Exception ignored) {} + } + + @Test + public void testWithIgnoringHandler() { + AscendingTimestampExtractor<Long> extractor = new LongExtractor() + .withViolationHandler(new AscendingTimestampExtractor.IgnoringHandler()); + + runValidTests(extractor); + runInvalidTest(extractor); + } + + @Test + public void testWithLoggingHandler() { + AscendingTimestampExtractor<Long> extractor = new LongExtractor() + .withViolationHandler(new AscendingTimestampExtractor.LoggingHandler()); + + runValidTests(extractor); + runInvalidTest(extractor); + } + + @Test + public void testWithDefaultHandler() { + AscendingTimestampExtractor<Long> extractor = new LongExtractor(); + + runValidTests(extractor); + runInvalidTest(extractor); + } + + // ------------------------------------------------------------------------ + + private void runValidTests(AscendingTimestampExtractor<Long> extractor) { + assertEquals(13L, extractor.extractTimestamp(13L, -1L)); + assertEquals(13L, extractor.extractTimestamp(13L, 0L)); + assertEquals(14L, extractor.extractTimestamp(14L, 0L)); + assertEquals(20L, extractor.extractTimestamp(20L, 0L)); + assertEquals(20L, extractor.extractTimestamp(20L, 0L)); + assertEquals(20L, extractor.extractTimestamp(20L, 0L)); + assertEquals(500L, extractor.extractTimestamp(500L, 0L)); + + assertEquals(Long.MAX_VALUE - 1, extractor.extractTimestamp(Long.MAX_VALUE - 1, 99999L)); + + + } + + private void runInvalidTest(AscendingTimestampExtractor<Long> extractor) { + assertEquals(1000L, extractor.extractTimestamp(1000L, 100)); + assertEquals(1000L, extractor.extractTimestamp(1000L, 100)); + + // violation + assertEquals(999L, extractor.extractTimestamp(999L, 100)); + } + + // ------------------------------------------------------------------------ + + private static class LongExtractor extends AscendingTimestampExtractor<Long> { + private static final long serialVersionUID = 1L; + + @Override + public long extractAscendingTimestamp(Long element, long currentTimestamp) { + return element; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e08d7a6f/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java index 677636a..1416104 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java @@ -260,7 +260,7 @@ public class TimestampITCase { public void cancel() {} }); - DataStream<Integer> extractOp = source1.assignTimestamps( + DataStream<Integer> extractOp = source1.assignTimestampsAndWatermarks( new AscendingTimestampExtractor<Integer>() { @Override public long extractAscendingTimestamp(Integer element, long currentTimestamp) { http://git-wip-us.apache.org/repos/asf/flink/blob/e08d7a6f/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index 4b019f9..0f6ec7d 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -726,7 +726,7 @@ class DataStream[T](stream: JavaStream[T]) { cleanExtractor(element) } } - stream.assignTimestamps(extractorFunction) + stream.assignTimestampsAndWatermarks(extractorFunction) } /**