Thanks Timo for the suggestion! Also apologies for missing your response
last week. I will try to come up with a reproducible test case.

On Wed, Mar 18, 2020 at 9:27 AM Timo Walther <twal...@apache.org> wrote:

> Hi Vinod,
>
> thanks for answering my questions. The == Optimized Logical Plan ==
> looks as expected. However, the == Physical Execution Plan == seems to
> be quite complex. Are you sure that watermarks don't get lost in some of
> those custom operators before entering the SQL part of the pipeline?
>
> I think if there is a bug in the SQL code base, we would need to come up
> with a small table program that reproduces the described problem. Such
> that we can do a remote debugging session.
>
> Maybe you can do this in your local cluster? There are basically two
> runtime operators
>
> org.apache.flink.table.runtime.join.RowTimeBoundedStreamJoin
>
> and
>
> the regular DataStream API windows. I don't expect bugs in DataStream
> API windows, so I would suggest to verify the join operator.
>
> I hope this helps.
>
> Regards,
> Timo
>
>
>
> On 13.03.20 23:56, Vinod Mehra wrote:
> > Thanks Timo for responding back! Answers below:
> >
> >  > 1) Which planner are you using?
> >
> > We are using Flink 1.8 and using the default planner
> > (org.apache.flink.table.calcite.FlinkPlannerImpl)
> > from: org.apache.flink:flink-table-planner_2.11:1.8
> >
> >  > 2) How do you create your watermarks?
> >
> > We are using periodic watermarking and have configured stream time
> > characteristics as TimeCharacteristic.EventTime. The watermark assigner
> > extracts the timestamp from time attributes from the event and keeps it
> > 5 seconds behind the maximum timestamp seen in order to allow for stale
> > events.
> >
> >  > 3) Did you unit test with only parallelism of 1 or higher?
> >
> > I tried both 1 and higher values in tests and for all parallelism values
> > the unit tests works as expected.
> >
> > 4) Can you share the output of TableEnvironment.explain() with us?
> >
> > Attached. Please note that I had obfuscated the query a bit in my
> > original post for clarity. I have pasted the actual query along with the
> > plan so that you can correlate it.
> >
> >  > Shouldn't c have a rowtime constraint around o instead of r? Such
> > that all time-based operations work on o.rowtime?
> >
> > I have tried both (and some more variations). Got the same results (unit
> > tests passes but production execution doesn't join as expected). Here is
> > the modified query:
> >
> > SELECT o.region_code,
> > concat_ws(
> > '/',
> > CAST(sum(CASE WHEN r.order_id IS NOT NULL AND c.order_id IS NULL THEN 1
> > ELSE 0 END) AS VARCHAR),
> > CAST(count(1) AS VARCHAR)
> > ) AS offer_conversion_5m
> > FROM (
> > SELECT region_code,
> > offer_id,
> > rowtime
> > FROM event_offer_created
> > WHERE ...
> > ) o
> > LEFT JOIN (
> > SELECT offer_id,
> > order_id,
> > rowtime
> > FROM event_order_requested
> > WHERE ...
> > ) r
> > ON o.offer_id = r.offer_id
> > AND o.rowtime BETWEEN r.rowtime - INTERVAL '1' hour AND r.rowtime
> >
> > LEFT JOIN (
> > SELECT order_id,
> > rowtime
> > FROM event_order_cancelled
> > WHERE ...
> > ) c
> > ON r.order_id = c.order_id
> > AND o.rowtime BETWEEN c.rowtime - INTERVAL '2' hour AND c.rowtime
> >
> > GROUP BY
> > o.region_code,
> >         TUMBLE(o.rowtime,INTERVAL '5' minute)
> >
> >
> > We used minus two hours ("c.rowtime - INTERVAL '2' hour")  in the 2nd
> > time window because it is from the first table and 3rd one.
> >
> > -- Vinod
> >
> > On Fri, Mar 13, 2020 at 6:42 AM Timo Walther <twal...@apache.org
> > <mailto:twal...@apache.org>> wrote:
> >
> >     Hi Vinod,
> >
> >     I cannot spot any problems in your SQL query.
> >
> >     Some questions for clarification:
> >     1) Which planner are you using?
> >     2) How do you create your watermarks?
> >     3) Did you unit test with only parallelism of 1 or higher?
> >     4) Can you share the output of TableEnvironment.explain() with us?
> >
> >     Shouldn't c have a rowtime constraint around o instead of r? Such
> that
> >     all time-based operations work on o.rowtime?
> >
> >     Regards,
> >     Timo
> >
> >
> >     On 10.03.20 19:26, Vinod Mehra wrote:
> >      > Hi!
> >      >
> >      > We are testing the following 3 way time windowed join to keep the
> >      > retained state size small. Using joins for the first time here.
> >     It works
> >      > in unit tests but we are not able to get expected results in
> >     production.
> >      > We are still troubleshooting this issue. Can you please help us
> >     review
> >      > this in case we missed something or our assumptions are wrong?
> >      >
> >      > SELECT o.region_code,
> >      >         concat_ws(
> >      >           '/',
> >      >           CAST(sum(CASE WHEN r.order_idIS NOT NULL AND
> >     c.order_idIS NULL THEN 1 ELSE 0 END)AS VARCHAR),
> >      >           CAST(count(1)AS VARCHAR)
> >      >         )AS offer_conversion_5m
> >      >    FROM (
> >      >          SELECT region_code,
> >      >                 offer_id,
> >      >                 rowtime
> >      >            FROM event_offer_created
> >      >           WHERE ...
> >      > ) o
> >      >     LEFT JOIN (
> >      >          SELECT offer_id,
> >      >                 order_id,
> >      >                 rowtime
> >      >            FROM event_order_requested
> >      >           WHERE ...
> >      > ) r
> >      >       ON o.offer_id = r.offer_id
> >      >       AND r.rowtimeBETWEEN o.rowtimeAND o.rowtime +INTERVAL '1'
> hour
> >      > LEFT JOIN (
> >      >          SELECT order_id,
> >      >                 rowtime
> >      >            FROM event_order_cancelled
> >      >           WHERE ...
> >      > )c
> >      > ON r.order_id =c.order_id
> >      >       AND c.rowtimeBETWEEN r.rowtimeAND r.rowtime +INTERVAL '1'
> hour
> >      > GROUP BY
> >      > o.region_code,
> >      >         TUMBLE(o.rowtime,INTERVAL '5' minute)
> >      >
> >      >
> >      > The sequence of events is:
> >      >
> >      >  1. At time X an offer is created (event stream =
> >     "*event_offer_created"*)
> >      >  2. At time Y that offer is used to create an order (event stream
> =
> >      >     "*event_order_requested*"). Left join because not all offers
> >     get used.
> >      >  3. At time Z that order is cancelled (event stream =
> >      >     "*event_order_cancelled*"). Left join because not all orders
> get
> >      >     cancelled.
> >      >
> >      > "*offer_conversion_5m*" represents: number of converted orders /
> >     total
> >      > number of offerings" in a 5 minutes bucket. If an order gets
> >     cancelled
> >      > we don't want to count that. That's why we have [c.order_id IS
> >     NULL THEN
> >      > 1 ELSE 0 END] in the select.
> >      >
> >      > We picked 1 hour time windows because that's the maximum time we
> >     expect
> >      > the successive events to take for a given record chain.
> >      >
> >      > The outer GROUP BY is to get 5 minute aggregation for each
> >     "region". As
> >      > expected the watermark lags 2 hour from the current time because
> >     of the
> >      > two time-window joins above. The IdleStateRetentionTime is not
> >     set, so
> >      > the expectation is that the state will be retained as per the time
> >      > window size and as the records fall off the window the state will
> be
> >      > cleaned up. The aggregated state is expected to be kept around
> for 5
> >      > minutes (GROUP BY).
> >      >
> >      > However, we are unable to see the conversion (offer_created ->
> >      > order_requested (without order_cancelled)).
> >     '*offer_conversion_5m*' is
> >      > always zero although we know the streams contain records that
> should
> >      > have incremented the count. Any idea what could be wrong? Is the
> >     state
> >      > being dropped too early (5 mins) because of the outer 5 minute
> >     tumbling
> >      > window?
> >      >
> >      > Thanks,
> >      > Vinod
> >
>
>

Reply via email to