Thanks.  The ideal case is to fire after watermark past each element from
the window but that requires a custom trigger and FLIP-2 as well. The
enhanced window evictor will help to avoid the last firing.

Are the discussions on FLIP-2 still going on ?
Are there any opening JIRAs or PRs ? (The proposed `ProcessWindowFunction`
will be sufficient for my case)
Is there a workaround now for my case ?

Thanks again for following through this.
Manu

On Wed, Nov 2, 2016 at 1:07 AM Aljoscha Krettek <aljos...@apache.org> wrote:

> Ah, I finally understand it. You would a way to query the current
> watermark in the window function to only emit those elements where the
> timestamp is lower than the watermark.
>
> When the window fires again, do you want to emit elements that you emitted
> during the last firing again? If not, I think you also need to use an
> evictor to evict the elements from the window where the timestamp is lower
> than the watermark. With this FLIP
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata
>  we
> should be able to extend the WindowFunction Context to also provide the
> current watermark. With this recent PR
> https://github.com/apache/flink/pull/2736 you would be able to evict
> elements from the window state after the window function was called.
>
> Cheers,
> Aljoscha
>
> On Tue, 1 Nov 2016 at 02:27 Manu Zhang <owenzhang1...@gmail.com> wrote:
>
> Yes, here's the example
> https://github.com/manuzhang/flink/blob/pv/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/session/PageViewSessionWindowing.scala
>
> If you print and compare the timestamp of timer with that of "PageView" in
> the outputs, you could see what I mean.
>
> I think the recently introduced TimelyFlatMapFunction is close to what I
> want to achieve. It will be great if we can query time information in the
> window function so I filed
> https://issues.apache.org/jira/browse/FLINK-4953
>
> Thanks for your time.
>
> Manu
>
> On Mon, Oct 31, 2016 at 10:29 PM Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
> Hmm, I don't completely understand what's going on. Could you maybe post
> an example, with the trigger code that shows this behaviour?
>
> Cheers,
> Aljoscha
>
> On Thu, 27 Oct 2016 at 17:12 Manu Zhang <owenzhang1...@gmail.com> wrote:
>
> Hi,
>
> It's what I'm seeing. If timers are not fired at the end of window, a
> state (in the window) whose timestamp is *after *the timer will also be
> emitted. That's a problem for event-time trigger.
>
> Thanks,
> Manu
>
>
> On Thu, Oct 27, 2016 at 10:29 PM Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
> Hi,
> is that example input/output what you would like to achieve or what you
> are currently seeing with Flink? I think for your use case a custom Trigger
> would be required that works like the event-time trigger but additionally
> registers timers for each element where you want to emit.
>
> Cheers,
> Aljoscha
>
> On Wed, 26 Oct 2016 at 04:04 Manu Zhang <owenzhang1...@gmail.com> wrote:
>
> Hi Aljoscha,
>
> Thanks for your response.  My use case is to track user trajectory based
> on page view event when they visit a website.  The input would be like a
> list of PageView(userId, url, eventTimestamp) with watermarks (=
> eventTimestamp - duration). I'm trying SessionWindows with some event time
> trigger. Note we can't wait for the end of session window due to latency.
> Instead, we want to emit the user trajectories whenever a buffered
> PageView's event time is passed by watermark. I tried
> ContinuousEventTimeTrigger and a custom trigger which sets timer on each
> element's timestamp. For both triggers I've witnessed a problem like the
> following (e.g. a session gap of 5)
>
> PageView("user1", "http://foo";, 1)
> PageView("user1", "http://foo/bar";, 2)
> Watermark(1) => emit UserTrajectory("user1", "http://foo -> *http://foo/bar
> <http://foo/bar>*", [1,6])
> PageView("user1", "http://foo/bar/foobar";, 5)
> Watermark(4) => emit UserTrajectory("user1", "http://foo -> http://foo/bar ->
> *http://foo/bar/foobar <http://foo/bar/foobar>*", [1, 10])
>
> The urls in bold should be included since there could be events before
> them not arrived yet.
>
>
> Thanks,
> Manu
>
>
> On Tue, Oct 25, 2016 at 1:36 AM Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
> Hi,
> with some additional information we might be able to figure this out
> together. What specific combination of WindowAssigner/Trigger are you using
> for your example and what is the input stream (including watermarks)?
>
> Cheers,
> Aljoscha
>
> On Mon, 24 Oct 2016 at 06:30 Manu Zhang <owenzhang1...@gmail.com> wrote:
>
> Hi,
>
> Say I have a window state of List(("a", 1:00), ("b", 1:03), ("c", 1:06))
> which is triggered to emit when watermark passes the timestamp of an
> element. For example,
>
> on watermark(1:01), List(("a", 1:00)) is emitted
> on watermark(1:04), List(("a", 1:00), ("b", 1:03)) is emitted
> on watermark(1:07), List(("a", 1:00), ("b", 1:03), ("c", 1:06)) is emitted
>
> It seems that if *("c", 1:06) is processed before watermark(1:04)*
> List(("a", 1:00), ("b", 1:03), ("c", 1:06)) will be emitted on
> watermark(1:04). This is incorrect since there could be elements with
> timestamp between 1:04 and 1:06 that have not arrived yet.
>
> I guess this is because watermark trigger doesn't check whether element's
> timestamp has been passed.
>
> Please correct me if any of the above is not right.
>
> Thanks,
> Manu Zhang
>
>
>
>

Reply via email to