Hi,

Am Fr., 10. Mai 2019 um 16:55 Uhr schrieb an0 <an0...@gmail.com>:

> > Q2: after a, map(A), and map(B) would work fine. Assign watermarks
> > immediatedly after a keyBy() is not a good idea, because 1) the records
> are
> > shuffled and it's hard to reasoning about ordering, and 2) you lose the
> > KeyedStream property and would have to keyBy() again (unless you use
> > interpreteAsKeyedStream).
>
> I know it is better to assignTimestampsAndWatermarks as early as possible.
> I intentionally put it after keyBy to check my understanding of this
> specific situation because I may have to use assignTimestampsAndWatermarks
> after keyBy in my application. I didn't make my question's intention clear
> though.
>
> I'm glad to learn another trick from you: reinterpretAsKeyedStream :).
> Let's assume it is keyBy.
> assignTimestampsAndWatermarks.reinterpretAsKeyedStream.timeWindow(C).
>
> I wanted to ask:
> Because it is using keyed windows, every key's window is fired
> independently, even if I assignTimestampsAndWatermarks after keyBy and C.2
> doesn't have any data so generates no watermarks, it won't affect the
> firing of C.1's windows. Is this understand correct?
>
> Yes, that is correct. A window operator task evaluates a window when its
event-time (advanced by watermarks) triggers the computation. Window
operator tasks do not synchronize their compuation. However, they are
usually synchronized due to the broadcasted watermarks.


> > Although C.2 does not receive data, it receives watermarks because WMs
> are
> > broadcasted. They flow to any task that is reachable by any event. The
> case
> > that all keys fall into C.1 is not important because a record for C.2
> might
> > arrive at any point in time. Also Flink does does not give any guarantees
> > about how keys (or rather key groups) are assigned to tasks. If you
> rescale
> > the application to a parallelism of 3, the active key group might be
> > scheduled to C.2 or C.3.
> >
> > Long story short, D makes progress in event time because watermarks are
> > broadcasted.
>
> I know watermarks are broadcasted. But I'm using
> assignTimestampsAndWatermarks after keyBy, so C.2 doesn't receive
> watermarks, it creates watermarks from it's received data. Since it doesn't
> receive any data, it doesn't create any watermarks. D couldn't make
> progress because one of its inputs, C2, doesn't make progress. Is this
> understand correct?
>
> Yes, I think that's correct. A timestamp assigner / watermark generator
swallows all WMs it receives and generates new ones. However, if the
assigner does not see any records, the WM will not move forward.


> On 2019/05/10 10:38:35, Fabian Hueske <fhue...@gmail.com> wrote:
> > Hi,
> >
> > Again answers below ;-)
> >
> > Am Do., 9. Mai 2019 um 17:12 Uhr schrieb an0 <an0...@gmail.com>:
> >
> > > You are right, thanks. But something is still not totally clear to me.
> > > I'll reuse your diagram with a little modification:
> > >
> > > DataStream<X> a = ...
> > > a.map(A).map(B).keyBy(....).timeWindow(C)
> > >
> > > and execute this with parallelism 2. However, keyBy only generates one
> > > single key value, and assume they all go to C.1. Does the data flow
> look
> > > like this?
> > >
> > > A.1 -- B.1 -----/-- C.1
> > >                     /
> > > A.2 -- B.2 --/       C.2
> > >
> > > Will the lack of data into C.2 prevent C.1's windows from firing? Will
> the
> > > location of assignTimestampsAndWatermarks(after a, after map(A), after
> > > map(B), after keyBy) matter for the firing of C.1's windows
> >
> > By my understanding, the answers are "no" and "no". Correct?
> > >
> > > Q1: no. Watermarks are propagated even in case of skewed key
> distribution.
> > C.2 will also advance it's event-time clock (based on the WMs) and
> forward
> > new WMs.
> > Q2: after a, map(A), and map(B) would work fine. Assign watermarks
> > immediatedly after a keyBy() is not a good idea, because 1) the records
> are
> > shuffled and it's hard to reasoning about ordering, and 2) you lose the
> > KeyedStream property and would have to keyBy() again (unless you use
> > interpreteAsKeyedStream).
> >
> >
> > > Now comes the *silly* question: does C.2 exist at all? Since there is
> only
> > > one key value, only one C instance is needed. I could see that C.2 as a
> > > physical instance may exist, but as a logical instance it shouldn't
> exist
> > > in the diagram because it is unused. I feel the answer to this silly
> > > question may be the most important in understanding my and(perhaps many
> > > others') misunderstanding of situations like this.
> > >
> > > If C.2 exists just because parallelism is set to 2, even though it is
> not
> > > logically needed, and it also serves as an input to the next operator
> if
> > > there is one, then the mystery is completely solved for me.
> > >
> > > C.2 exists. Flink does not create a flow topology based on data
> values. As
> > soon as there is a record with a key that would need to go to C.2, we
> would
> > need it.
> >
> >
> > > Use a concrete example:
> > >
> > > DataStream<X> a = ...
> > >
> > >
> a.map(A).map(B).keyBy(....).assignTimestampsAndWatermarks(C).timeWindowAll(D)
> > >
> > > A.1 -- B.1 -----/-- C.1 --\
> > >                     /                 D
> > > A.2 -- B.2 --/       C.2 --/
> > >
> > > D's watermark can not progress because C.2's watermark can not
> progress,
> > > because C.2 doesn't have any input data, even though C.2 is not
> logically
> > > needed but it does exist and it ruins everything :p. Is this
> understanding
> > > correct?
> > >
> >
> > Although C.2 does not receive data, it receives watermarks because WMs
> are
> > broadcasted. They flow to any task that is reachable by any event. The
> case
> > that all keys fall into C.1 is not important because a record for C.2
> might
> > arrive at any point in time. Also Flink does does not give any guarantees
> > about how keys (or rather key groups) are assigned to tasks. If you
> rescale
> > the application to a parallelism of 3, the active key group might be
> > scheduled to C.2 or C.3.
> >
> > Long story short, D makes progress in event time because watermarks are
> > broadcasted.
> >
> >
> > >
> > > On 2019/05/09 10:01:44, Fabian Hueske <fhue...@gmail.com> wrote:
> > > > Hi,
> > > >
> > > > Please find my response below.
> > > >
> > > > Am Fr., 3. Mai 2019 um 16:16 Uhr schrieb an0 <an0...@gmail.com>:
> > > >
> > > > > Thanks, but it does't seem covering this rule:
> > > > > --- Quote
> > > > > Watermarks are generated at, or directly after, source functions.
> Each
> > > > > parallel subtask of a source function usually generates its
> watermarks
> > > > > independently. These watermarks define the event time at that
> > > particular
> > > > > parallel source.
> > > > >
> > > > > As the watermarks flow through the streaming program, they advance
> the
> > > > > event time at the operators where they arrive. Whenever an operator
> > > > > advances its event time, it generates a new watermark downstream
> for
> > > its
> > > > > successor operators.
> > > > >
> > > > > Some operators consume multiple input streams; a union, for
> example, or
> > > > > operators following a keyBy(…) or partition(…) function. Such an
> > > operator’s
> > > > > current event time is the minimum of its input streams’ event
> times.
> > > As its
> > > > > input streams update their event times, so does the operator.
> > > > > --- End Quote
> > > > >
> > > > > The most relevant part, I believe, is this:
> > > > > "Some operators consume multiple input streams…operators following
> a
> > > > > keyBy(…) function. Such an operator’s current event time is the
> > > minimum of
> > > > > its input streams’ event times."
> > > > >
> > > > > But the wording of "current event time is the minimum of its input
> > > > > streams’ event times" actually implies that the input
> streams(produced
> > > by
> > > > > keyBy) have different watermarks, the exactly opposite of what you
> just
> > > > > explained.
> > > > >
> > > > >
> > > > IMO, the description in the documentation is correct, but looks at
> the
> > > > issue from a different angle.
> > > > An operator task typically has many input from which it receives
> records.
> > > > Depending on the number of input operators (one ore more) and the
> > > > connection between the operator and its input operators (forward,
> > > > partition, broadcast), an operator task might have a connection to
> one,
> > > > some, or all tasks of its input operators. Each input task can send a
> > > > different watermark, but each task will also send the same watermark
> to
> > > all
> > > > its output tasks.
> > > >
> > > > So, this is a matter of distinguishing receiving (different)
> watermarks
> > > and
> > > > emitting (the same) watermarks.
> > > >
> > > > Best, Fabian
> > > >
> > > >
> > > > > On 2019/05/03 07:32:07, Fabian Hueske <fhue...@gmail.com> wrote:
> > > > > > Hi,
> > > > > >
> > > > > > this should be covered here:
> > > > > >
> > > > >
> > >
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_time.html#watermarks-in-parallel-streams
> > > > > >
> > > > > > Best, Fabian
> > > > > >
> > > > > > Am Do., 2. Mai 2019 um 17:48 Uhr schrieb an0 <an0...@gmail.com>:
> > > > > >
> > > > > > > This explanation is exactly what I'm looking for, thanks! Is
> such
> > > an
> > > > > > > important rule documented anywhere in the official document?
> > > > > > >
> > > > > > > On 2019/04/30 08:47:29, Fabian Hueske <fhue...@gmail.com>
> wrote:
> > > > > > > > An operator task broadcasts its current watermark to all
> > > downstream
> > > > > tasks
> > > > > > > > that might receive its records.
> > > > > > > > If you have an the following code:
> > > > > > > >
> > > > > > > > DataStream<X> a = ...
> > > > > > > > a.map(A).map(B).keyBy(....).window(C)
> > > > > > > >
> > > > > > > > and execute this with parallelism 2, your plan looks like
> this
> > > > > > > >
> > > > > > > > A.1 -- B.1 --\--/-- C.1
> > > > > > > >                       X
> > > > > > > > A.2 -- B.2 --/--\-- C.2
> > > > > > > >
> > > > > > > > A.1 will propagate its watermarks to B.1 because only B.1
> will
> > > > > receive
> > > > > > > its
> > > > > > > > output events.
> > > > > > > > However, B.1 will propagate its watermarks to C.1 and C.2
> > > because the
> > > > > > > > output of B.1 is partitioned and all C tasks might receive
> output
> > > > > events
> > > > > > > > from B.1.
> > > > > > > >
> > > > > > > > Best, Fabian
> > > > > > > >
> > > > > > > > Am Mo., 29. Apr. 2019 um 20:06 Uhr schrieb an0 <
> an0...@gmail.com
> > > >:
> > > > > > > >
> > > > > > > > > Thanks very much. It definitely explains the problem I'm
> > > seeing.
> > > > > > > However,
> > > > > > > > > something I need to confirm:
> > > > > > > > > You say "Watermarks are broadcasted/forwarded anyway." Do
> you
> > > > > mean, in
> > > > > > > > > assingTimestampsAndWatermarks.keyBy.window, it doesn't
> matter
> > > what
> > > > > data
> > > > > > > > > flows through a specific key's stream, all key streams
> have the
> > > > > same
> > > > > > > > > watermarks? So time-wise, `window` behaves as if `keyBy`
> is not
> > > > > there
> > > > > > > at
> > > > > > > > > all?
> > > > > > > > >
> > > > > > > > > On 2019/04/26 06:34:10, Dawid Wysakowicz <
> > > dwysakow...@apache.org>
> > > > > > > wrote:
> > > > > > > > > > Hi,
> > > > > > > > > >
> > > > > > > > > > Watermarks are meta events that travel independently of
> data
> > > > > events.
> > > > > > > > > >
> > > > > > > > > > 1) If you assingTimestampsAndWatermarks before keyBy, all
> > > > > parallel
> > > > > > > > > > instances of trips have some data(this is my assumption)
> so
> > > > > > > Watermarks
> > > > > > > > > > can be generated. Afterwards even if some of the keyed
> > > partitions
> > > > > > > have
> > > > > > > > > > no data, Watermarks are broadcasted/forwarded anyway. In
> > > other
> > > > > words
> > > > > > > if
> > > > > > > > > > at some point Watermarks were generated for all
> partitions
> > > of a
> > > > > > > single
> > > > > > > > > > stage, they will be forwarded beyond this point.
> > > > > > > > > >
> > > > > > > > > > 2) If you assingTimestampsAndWatermarks after keyBy, you
> try
> > > to
> > > > > > > assign
> > > > > > > > > > watermarks for an empty partition which produces no
> > > Watermarks
> > > > > at all
> > > > > > > > > > for this partition, therefore there is no progress beyond
> > > this
> > > > > point.
> > > > > > > > > >
> > > > > > > > > > I hope this clarifies it a bit.
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > >
> > > > > > > > > > Dawid
> > > > > > > > > >
> > > > > > > > > > On 25/04/2019 16:49, an0 wrote:
> > > > > > > > > > > If my understanding is correct, then why
> > > > > > > > > `assignTimestampsAndWatermarks` before `keyBy` works? The
> > > > > > > `timeWindowAll`
> > > > > > > > > stream's input streams are task 1 and task 2, with task 2
> > > idling,
> > > > > no
> > > > > > > matter
> > > > > > > > > whether `assignTimestampsAndWatermarks` is before or after
> > > `keyBy`,
> > > > > > > because
> > > > > > > > > whether task 2 receives elements only depends on the key
> > > > > distribution,
> > > > > > > has
> > > > > > > > > nothing to do with timestamp assignment, right?
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > >                  /key 1 trips\
> > > > > > > > > > >
> > > > > > > > >                /                    \
> > > > > > > > > > > (A) trips--> assignTimestampsAndWatermarks-->keyBy
> > > > > > > > > timeWindowAll
> > > > > > > > > > >
> > > > > > > > >                \       idle        /
> > > > > > > > > > >
> > > > > > > > >                  \key 2 trips/
> > > > > > > > > > >
> > > > > > > > > > >                            /key 1 trips-->
> > > > > > > > > assignTimestampsAndWatermarks\
> > > > > > > > > > >                          /
> > > > > > > > >                                      \
> > > > > > > > > > > (B) trips-->keyBy
> > > > > > > > >                            timeWindowAll
> > > > > > > > > > >                          \       idle
> > > > > > > > >                                    /
> > > > > > > > > > >                            \key 2 trips-->
> > > > > > > > > assignTimestampsAndWatermarks/
> > > > > > > > > > >
> > > > > > > > > > > How things are different between A and B from
> > > `timeWindowAll`'s
> > > > > > > > > perspective?
> > > > > > > > > > >
> > > > > > > > > > > BTW, thanks for the webinar link, I'll check it later.
> > > > > > > > > > >
> > > > > > > > > > > On 2019/04/25 08:30:20, Dawid Wysakowicz <
> > > > > dwysakow...@apache.org>
> > > > > > > > > wrote:
> > > > > > > > > > >> Hi,
> > > > > > > > > > >>
> > > > > > > > > > >> Yes I think your explanation is correct. I can also
> > > recommend
> > > > > > > Seth's
> > > > > > > > > > >> webinar where he talks about debugging Watermarks[1]
> > > > > > > > > > >>
> > > > > > > > > > >> Best,
> > > > > > > > > > >>
> > > > > > > > > > >> Dawid
> > > > > > > > > > >>
> > > > > > > > > > >> [1]
> > > > > > > > > > >>
> > > > > > > > >
> > > > > > >
> > > > >
> > >
> https://www.ververica.com/resources/webinar/webinar/debugging-flink-tutorial
> > > > > > > > > > >>
> > > > > > > > > > >> On 22/04/2019 22:55, an0 wrote:
> > > > > > > > > > >>> Thanks, I feel I'm getting closer to the truth.
> > > > > > > > > > >>>
> > > > > > > > > > >>> So parallelism is the cause? Say my parallelism is 2.
> > > Does
> > > > > that
> > > > > > > mean
> > > > > > > > > I get 2 tasks running after `keyBy` if even all elements
> have
> > > the
> > > > > same
> > > > > > > key
> > > > > > > > > so go to 1 down stream(say task 1)? And it is the other
> > > task(task
> > > > > 2)
> > > > > > > with
> > > > > > > > > no incoming data that caused the `timeWindowAll` stream
> unable
> > > to
> > > > > > > progress?
> > > > > > > > > Because both task 1 and task 2 are its input streams and
> one is
> > > > > idling
> > > > > > > so
> > > > > > > > > its event time cannot make progress?
> > > > > > > > > > >>>
> > > > > > > > > > >>> On 2019/04/22 01:57:39, Guowei Ma <
> guowei....@gmail.com>
> > > > > wrote:
> > > > > > > > > > >>>> HI,
> > > > > > > > > > >>>>
> > > > > > > > > > >>>> BoundedOutOfOrdernessTimestampExtractors can send a
> WM
> > > at
> > > > > least
> > > > > > > > > after it
> > > > > > > > > > >>>> receives an element.
> > > > > > > > > > >>>>
> > > > > > > > > > >>>> For after Keyby:
> > > > > > > > > > >>>> Flink uses the HashCode of key and the parallelism
> of
> > > down
> > > > > > > stream
> > > > > > > > > to decide
> > > > > > > > > > >>>> which subtask would receive the element. This means
> if
> > > your
> > > > > key
> > > > > > > is
> > > > > > > > > always
> > > > > > > > > > >>>> same, all the sources will only send the elements
> to the
> > > > > same
> > > > > > > down
> > > > > > > > > stream
> > > > > > > > > > >>>> task, for example only no. 3
> > > > > > > > > BoundedOutOfOrdernessTimestampExtractor.
> > > > > > > > > > >>>>
> > > > > > > > > > >>>> For before Keyby:
> > > > > > > > > > >>>> In your case, the Source and
> > > > > > > > > BoundedOutOfOrdernessTimestampExtractors would
> > > > > > > > > > >>>> be chained together, which means every
> > > > > > > > > > >>>> BoundedOutOfOrdernessTimestampExtractors will
> receive
> > > > > elements.
> > > > > > > > > > >>>>
> > > > > > > > > > >>>> Best,
> > > > > > > > > > >>>> Guowei
> > > > > > > > > > >>>>
> > > > > > > > > > >>>>
> > > > > > > > > > >>>> an0 <an0...@gmail.com> 于2019年4月19日周五 下午10:41写道:
> > > > > > > > > > >>>>
> > > > > > > > > > >>>>> Hi,
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> First of all, thank you for the `shuffle()` tip. It
> > > works.
> > > > > > > > > However, I
> > > > > > > > > > >>>>> still don't understand why it doesn't work without
> > > calling
> > > > > > > > > `shuffle()`.
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> Why would not all
> > > BoundedOutOfOrdernessTimestampExtractors
> > > > > > > receive
> > > > > > > > > trips?
> > > > > > > > > > >>>>> All the trips has keys and timestamps. As I said
> in my
> > > > > reply to
> > > > > > > > > Paul, I see
> > > > > > > > > > >>>>> the same watermarks being extracted.
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> How could calling `assignTimestampsAndWatermarks`
> > > before VS
> > > > > > > after
> > > > > > > > > `keyBy`
> > > > > > > > > > >>>>> matter? My understanding is any specific window
> for a
> > > > > specific
> > > > > > > key
> > > > > > > > > always
> > > > > > > > > > >>>>> receives the exactly same data, and the calling
> order
> > > of
> > > > > > > > > > >>>>> `assignTimestampsAndWatermarks` and `keyBy`
> shouldn't
> > > > > affect
> > > > > > > that.
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> To make `keyBy` as irrelevant as possible, I tried
> > > letting
> > > > > it
> > > > > > > > > always
> > > > > > > > > > >>>>> return the same key so that there is only 1 keyed
> > > stream
> > > > > and
> > > > > > > it is
> > > > > > > > > exactly
> > > > > > > > > > >>>>> the same as the original unkeyed stream. It still
> > > doesn't
> > > > > > > trigger
> > > > > > > > > windows:
> > > > > > > > > > >>>>> ```java
> > > > > > > > > > >>>>> DataStream<Trip> trips = env.addSource(consumer);
> > > > > > > > > > >>>>> KeyedStream<Trip, Long> userTrips =
> trips.keyBy(trip ->
> > > > > 0L);
> > > > > > > > > > >>>>> DataStream<Trip> featurizedUserTrips =
> > > > > > > > > > >>>>>         userTrips.map(trip ->
> > > > > > > > > trip).assignTimestampsAndWatermarks(new
> > > > > > > > > > >>>>>
> > > > > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
> > > > > > > > > > >>>>>     @Override
> > > > > > > > > > >>>>>     public long extractTimestamp(Trip trip) {
> > > > > > > > > > >>>>>         return trip.endTime.getTime();
> > > > > > > > > > >>>>>     }
> > > > > > > > > > >>>>> });
> > > > > > > > > > >>>>> AllWindowedStream<Trip, TimeWindow>
> windowedUserTrips =
> > > > > > > > > > >>>>> featurizedUserTrips.timeWindowAll(Time.days(7),
> > > > > > > > > > >>>>>         Time.days(1));
> > > > > > > > > > >>>>> ```
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> It makes no sense to me. Please help me understand
> why
> > > it
> > > > > > > doesn't
> > > > > > > > > work.
> > > > > > > > > > >>>>> Thanks!
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> On 2019/04/19 04:14:31, Guowei Ma <
> > > guowei....@gmail.com>
> > > > > > > wrote:
> > > > > > > > > > >>>>>> Hi,
> > > > > > > > > > >>>>>> After keyby maybe only some of
> > > > > > > > > BoundedOutOfOrdernessTimestampExtractors
> > > > > > > > > > >>>>>> could receive the elements(trip). If that is the
> case
> > > > > > > > > > >>>>>> BoundedOutOfOrdernessTimestampExtractor, which
> does
> > > not
> > > > > > > receive
> > > > > > > > > element
> > > > > > > > > > >>>>>> would not send the WM. Since that the
> timeWindowAll
> > > > > operator
> > > > > > > > > could not be
> > > > > > > > > > >>>>>> triggered.
> > > > > > > > > > >>>>>> You could add a shuffle() before the
> > > > > > > > > assignTimestampsAndWatermarks in
> > > > > > > > > > >>>>> your
> > > > > > > > > > >>>>>> second case and check if the window is
> triggered.  If
> > > it
> > > > > > > could be
> > > > > > > > > > >>>>> triggered
> > > > > > > > > > >>>>>> you could check the distribution of elements
> > > generated by
> > > > > the
> > > > > > > > > source.
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>> Best,
> > > > > > > > > > >>>>>> Guowei
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>> an0...@gmail.com <an0...@gmail.com> 于2019年4月19日周五
> > > > > 上午4:10写道:
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>>> I don't think it is the watermark. I see the same
> > > > > watermarks
> > > > > > > > > from the
> > > > > > > > > > >>>>> two
> > > > > > > > > > >>>>>>> versions of code.
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>> The processing on the keyed stream doesn't change
> > > event
> > > > > time
> > > > > > > at
> > > > > > > > > all. I
> > > > > > > > > > >>>>> can
> > > > > > > > > > >>>>>>> simply change my code to use `map` on the keyed
> > > stream to
> > > > > > > return
> > > > > > > > > back
> > > > > > > > > > >>>>> the
> > > > > > > > > > >>>>>>> input data, so that the window operator receives
> the
> > > > > exactly
> > > > > > > same
> > > > > > > > > > >>>>> data. The
> > > > > > > > > > >>>>>>> only difference is when I do
> > > > > > > `assignTimestampsAndWatermarks`. The
> > > > > > > > > > >>>>> result is
> > > > > > > > > > >>>>>>> the same, `assignTimestampsAndWatermarks` before
> > > `keyBy`
> > > > > > > works:
> > > > > > > > > > >>>>>>> ```java
> > > > > > > > > > >>>>>>> DataStream<Trip> trips =
> > > > > > > > > > >>>>>>>
> > > > > > >  env.addSource(consumer).assignTimestampsAndWatermarks(new
> > > > > > > > > > >>>>>>>
> > > > > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
> > > > > > > > > > >>>>>>>     @Override
> > > > > > > > > > >>>>>>>     public long extractTimestamp(Trip trip) {
> > > > > > > > > > >>>>>>>         return trip.endTime.getTime();
> > > > > > > > > > >>>>>>>     }
> > > > > > > > > > >>>>>>> });
> > > > > > > > > > >>>>>>> KeyedStream<Trip, Long> userTrips =
> trips.keyBy(trip
> > > ->
> > > > > > > > > trip.userId);
> > > > > > > > > > >>>>>>> DataStream<Trip> featurizedUserTrips =
> > > > > userTrips.map(trip ->
> > > > > > > > > trip);
> > > > > > > > > > >>>>>>> AllWindowedStream<Trip, TimeWindow>
> > > windowedUserTrips =
> > > > > > > > > > >>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7),
> > > > > > > > > > >>>>>>>         Time.days(1));
> > > > > > > > > > >>>>>>> ```
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>> `assignTimestampsAndWatermarks` after `keyBy`
> doesn't
> > > > > work:
> > > > > > > > > > >>>>>>> ```java
> > > > > > > > > > >>>>>>> DataStream<Trip> trips = env.addSource(consumer);
> > > > > > > > > > >>>>>>> KeyedStream<Trip, Long> userTrips =
> trips.keyBy(trip
> > > ->
> > > > > > > > > trip.userId);
> > > > > > > > > > >>>>>>> DataStream<Trip> featurizedUserTrips =
> > > > > > > > > > >>>>>>>         userTrips.map(trip ->
> > > > > > > > > trip).assignTimestampsAndWatermarks(new
> > > > > > > > > > >>>>>>>
> > > > > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
> > > > > > > > > > >>>>>>>     @Override
> > > > > > > > > > >>>>>>>     public long extractTimestamp(Trip trip) {
> > > > > > > > > > >>>>>>>         return trip.endTime.getTime();
> > > > > > > > > > >>>>>>>     }
> > > > > > > > > > >>>>>>> });
> > > > > > > > > > >>>>>>> AllWindowedStream<Trip, TimeWindow>
> > > windowedUserTrips =
> > > > > > > > > > >>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7),
> > > > > > > > > > >>>>>>>         Time.days(1));
> > > > > > > > > > >>>>>>> ```
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>> It feels a bug to me, but I want to confirm it
> > > before I
> > > > > file
> > > > > > > the
> > > > > > > > > bug
> > > > > > > > > > >>>>>>> report.
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>> On 2019/04/18 03:38:34, Paul Lam <
> > > paullin3...@gmail.com>
> > > > > > > wrote:
> > > > > > > > > > >>>>>>>> Hi,
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>> Could you check the watermark of the window
> > > operator?
> > > > > One
> > > > > > > > > possible
> > > > > > > > > > >>>>>>> situation would be some of the keys are not
> getting
> > > > > enough
> > > > > > > > > inputs, so
> > > > > > > > > > >>>>> their
> > > > > > > > > > >>>>>>> watermarks remain below the window end time and
> hold
> > > the
> > > > > > > window
> > > > > > > > > > >>>>> operator
> > > > > > > > > > >>>>>>> watermark back. IMO, it’s a good practice to
> assign
> > > > > watermark
> > > > > > > > > earlier
> > > > > > > > > > >>>>> in
> > > > > > > > > > >>>>>>> the data pipeline.
> > > > > > > > > > >>>>>>>> Best,
> > > > > > > > > > >>>>>>>> Paul Lam
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>>> 在 2019年4月17日,23:04,an0...@gmail.com 写道:
> > > > > > > > > > >>>>>>>>>
> > > > > > > > > > >>>>>>>>> `assignTimestampsAndWatermarks` before `keyBy`
> > > works:
> > > > > > > > > > >>>>>>>>> ```java
> > > > > > > > > > >>>>>>>>> DataStream<Trip> trips =
> > > > > > > > > > >>>>>>>>>
> > > > > > > > > env.addSource(consumer).assignTimestampsAndWatermarks(new
> > > > > > > > > > >>>>>>>
> > > > > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
> > > > > > > > > > >>>>>>>>>            @Override
> > > > > > > > > > >>>>>>>>>            public long extractTimestamp(Trip
> trip)
> > > {
> > > > > > > > > > >>>>>>>>>                return trip.endTime.getTime();
> > > > > > > > > > >>>>>>>>>            }
> > > > > > > > > > >>>>>>>>>        });
> > > > > > > > > > >>>>>>>>> KeyedStream<Trip, Long> userTrips =
> > > trips.keyBy(trip ->
> > > > > > > > > > >>>>> trip.userId);
> > > > > > > > > > >>>>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips
> =
> > > > > > > > > > >>>>> userTrips.process(new
> > > > > > > > > > >>>>>>> Featurization());
> > > > > > > > > > >>>>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow>
> > > > > > > > > windowedUserTrips =
> > > > > > > > > > >>>>>>>>>
> > > featurizedUserTrips.timeWindowAll(Time.days(7),
> > > > > > > > > > >>>>>>>>>                Time.days(1));
> > > > > > > > > > >>>>>>>>> ```
> > > > > > > > > > >>>>>>>>>
> > > > > > > > > > >>>>>>>>> But not after `keyBy` and `process`:
> > > > > > > > > > >>>>>>>>> ```java
> > > > > > > > > > >>>>>>>>> DataStream<Trip> trips =
> env.addSource(consumer);
> > > > > > > > > > >>>>>>>>> KeyedStream<Trip, Long> userTrips =
> > > trips.keyBy(trip ->
> > > > > > > > > > >>>>> trip.userId);
> > > > > > > > > > >>>>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips
> =
> > > > > > > > > > >>>>>>>>>        userTrips.process(new
> > > > > > > > > > >>>>>>>
> Featurization()).assignTimestampsAndWatermarks(new
> > > > > > > > > > >>>>>>>
> > > > > > > > >
> > > > >
> BoundedOutOfOrdernessTimestampExtractor<FeaturizedTrip>(Time.days(1)) {
> > > > > > > > > > >>>>>>>>>            @Override
> > > > > > > > > > >>>>>>>>>            public long
> > > extractTimestamp(FeaturizedTrip
> > > > > > > trip) {
> > > > > > > > > > >>>>>>>>>                return trip.endTime.getTime();
> > > > > > > > > > >>>>>>>>>            }
> > > > > > > > > > >>>>>>>>>        });
> > > > > > > > > > >>>>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow>
> > > > > > > > > windowedUserTrips =
> > > > > > > > > > >>>>>>>>>
> > > featurizedUserTrips.timeWindowAll(Time.days(7),
> > > > > > > > > > >>>>>>>>>                Time.days(1));
> > > > > > > > > > >>>>>>>>> ```
> > > > > > > > > > >>>>>>>>> Windows are never triggered.
> > > > > > > > > > >>>>>>>>>
> > > > > > > > > > >>>>>>>>> Is it a bug or expected behavior? If the
> latter,
> > > where
> > > > > is
> > > > > > > it
> > > > > > > > > > >>>>>>> documented?
> > > > > > > > > > >>
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to