I'm afraid the doc is wrong here. The JavaDoc on Watermark says this about
watermarks:

"A Watermark tells operators that receive it that no elements with a
timestamp older or equal to the watermark timestamp should arrive at the
operator."

The system also relies on this fact, as visible in how timers are read from
the watermark timers queue and in AscendingTimestampExtractor, which has
this code:

public final Watermark getCurrentWatermark() {
    return new Watermark(currentTimestamp == Long.MIN_VALUE ?
Long.MIN_VALUE : currentTimestamp - 1);
}

Notice, how the watermark is "currentTimestamp - 1" where current timestamp
is the highest seen timestamp so far and where we assume monotonically
ascending timestamps.

Cheers,
Aljoscha

On Tue, 20 Dec 2016 at 15:28 Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Jaromir,
>
> thank you very much for reporting this issue.
> The behavior you are describing is not in line with the documentation of
> watermarks [1] which clearly states that a watermark of time t tells the
> system that no more events with a timestamp < t will occur (otherwise they
> would be considered as late events). Hence, events with a timestamp = t as
> in your case should be OK and not be considered late.
>
> I think this is not intended and probably a bug.
>
> I'll loop in some contributors who are more familiar with watermarks and
> event-time (cc Aljoscha, Kostas K, Stephan).
>
> Best, Fabian
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/event_time.html#event-time-and-watermarks
>
> 2016-12-20 14:56 GMT+01:00 Jaromir Vanek <vanek.jaro...@gmail.com>:
>
> > Hi,
> >
> > I am using Flink 1.1.3 and following example doesn't work for me as
> > expected.
> >
> > I've got three input elements with similar timestamp (equaling to window
> > maxTimestamp). I'm using /event time/ notion of time with
> > /TumblingEventTimeWindows/.
> >
> > I would expect all three elements to be processed in the same window,
> > because they all have the identical event time timestamp. But the result
> > I'm
> > getting is just the first element that triggers the window. The rest of
> > elements are considered as late-comers and discarded.
> >
> > From my point of view this is definitely not correct and should be fixed.
> > Could you clarify if this is correct behavior or bug?
> >
> > I think the problem is in /WindowOperator#processWatermark/. Timer should
> > be
> > fired if and only if the current watermark is strictly larger than
> > registered timer.
> >
> > /
> > Timer<K, W> timer = watermarkTimersQueue.peek();
> > if (timer != null && timer.timestamp <= mark.getTimestamp()) {
> > /
> >
> > Thanks
> > Jaromir Vanek
> >
> > /
> > public class WindowingTest {
> >
> >   public static void main(String[] args) throws Exception {
> >     StreamExecutionEnvironment env =
> >             StreamExecutionEnvironment.createLocalEnvironment();
> >
> >     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >
> >     List<Tuple2&lt;Instant, Integer>> elements = Arrays.asList(
> >             new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), 100),
> >             new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), 200),
> >             new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), 300)
> >     );
> >
> >     DataStreamSource<Tuple2&lt;Instant, Integer>> input =
> > env.fromCollection(elements);
> >
> >     SingleOutputStreamOperator<Tuple2&lt;Instant, Integer>> timestamped =
> >             input.assignTimestampsAndWatermarks(new
> PunctuatedAssigner());
> >
> >     timestamped.timeWindowAll(Time.minutes(1))
> >          .sum(1)
> >          .print();
> >
> >     // printed result
> >     // (2016-12-19T10:59:59.999Z,100)
> >
> >     env.execute();
> >   }
> >
> >   private static class PunctuatedAssigner
> >           implements AssignerWithPunctuatedWatermarks<Tuple2&lt;Instant,
> > Integer>> {
> >
> >     @Override
> >     public long extractTimestamp(Tuple2<Instant, Integer> element, long
> > previousElementTimestamp) {
> >       return element.f0.toEpochMilli();
> >     }
> >
> >     @Override
> >     public Watermark checkAndGetNextWatermark(Tuple2<Instant, Integer>
> > lastElement, long extractedTimestamp) {
> >       return new Watermark(extractedTimestamp);
> >     }
> >   }
> > }
> > /
> >
> >
> >
> > --
> > View this message in context: http://apache-flink-mailing-
> > list-archive.1008284.n3.nabble.com/Flink-gives-
> > incorrect-result-when-event-time-windowing-used-tp15058.html
> > Sent from the Apache Flink Mailing List archive. mailing list archive at
> > Nabble.com.
> >
>

Reply via email to