Thanks Timo for the suggestion! Also apologies for missing your response last week. I will try to come up with a reproducible test case.
On Wed, Mar 18, 2020 at 9:27 AM Timo Walther <twal...@apache.org> wrote: > Hi Vinod, > > thanks for answering my questions. The == Optimized Logical Plan == > looks as expected. However, the == Physical Execution Plan == seems to > be quite complex. Are you sure that watermarks don't get lost in some of > those custom operators before entering the SQL part of the pipeline? > > I think if there is a bug in the SQL code base, we would need to come up > with a small table program that reproduces the described problem. Such > that we can do a remote debugging session. > > Maybe you can do this in your local cluster? There are basically two > runtime operators > > org.apache.flink.table.runtime.join.RowTimeBoundedStreamJoin > > and > > the regular DataStream API windows. I don't expect bugs in DataStream > API windows, so I would suggest to verify the join operator. > > I hope this helps. > > Regards, > Timo > > > > On 13.03.20 23:56, Vinod Mehra wrote: > > Thanks Timo for responding back! Answers below: > > > > > 1) Which planner are you using? > > > > We are using Flink 1.8 and using the default planner > > (org.apache.flink.table.calcite.FlinkPlannerImpl) > > from: org.apache.flink:flink-table-planner_2.11:1.8 > > > > > 2) How do you create your watermarks? > > > > We are using periodic watermarking and have configured stream time > > characteristics as TimeCharacteristic.EventTime. The watermark assigner > > extracts the timestamp from time attributes from the event and keeps it > > 5 seconds behind the maximum timestamp seen in order to allow for stale > > events. > > > > > 3) Did you unit test with only parallelism of 1 or higher? > > > > I tried both 1 and higher values in tests and for all parallelism values > > the unit tests works as expected. > > > > 4) Can you share the output of TableEnvironment.explain() with us? > > > > Attached. Please note that I had obfuscated the query a bit in my > > original post for clarity. I have pasted the actual query along with the > > plan so that you can correlate it. > > > > > Shouldn't c have a rowtime constraint around o instead of r? Such > > that all time-based operations work on o.rowtime? > > > > I have tried both (and some more variations). Got the same results (unit > > tests passes but production execution doesn't join as expected). Here is > > the modified query: > > > > SELECT o.region_code, > > concat_ws( > > '/', > > CAST(sum(CASE WHEN r.order_id IS NOT NULL AND c.order_id IS NULL THEN 1 > > ELSE 0 END) AS VARCHAR), > > CAST(count(1) AS VARCHAR) > > ) AS offer_conversion_5m > > FROM ( > > SELECT region_code, > > offer_id, > > rowtime > > FROM event_offer_created > > WHERE ... > > ) o > > LEFT JOIN ( > > SELECT offer_id, > > order_id, > > rowtime > > FROM event_order_requested > > WHERE ... > > ) r > > ON o.offer_id = r.offer_id > > AND o.rowtime BETWEEN r.rowtime - INTERVAL '1' hour AND r.rowtime > > > > LEFT JOIN ( > > SELECT order_id, > > rowtime > > FROM event_order_cancelled > > WHERE ... > > ) c > > ON r.order_id = c.order_id > > AND o.rowtime BETWEEN c.rowtime - INTERVAL '2' hour AND c.rowtime > > > > GROUP BY > > o.region_code, > > TUMBLE(o.rowtime,INTERVAL '5' minute) > > > > > > We used minus two hours ("c.rowtime - INTERVAL '2' hour") in the 2nd > > time window because it is from the first table and 3rd one. > > > > -- Vinod > > > > On Fri, Mar 13, 2020 at 6:42 AM Timo Walther <twal...@apache.org > > <mailto:twal...@apache.org>> wrote: > > > > Hi Vinod, > > > > I cannot spot any problems in your SQL query. > > > > Some questions for clarification: > > 1) Which planner are you using? > > 2) How do you create your watermarks? > > 3) Did you unit test with only parallelism of 1 or higher? > > 4) Can you share the output of TableEnvironment.explain() with us? > > > > Shouldn't c have a rowtime constraint around o instead of r? Such > that > > all time-based operations work on o.rowtime? > > > > Regards, > > Timo > > > > > > On 10.03.20 19:26, Vinod Mehra wrote: > > > Hi! > > > > > > We are testing the following 3 way time windowed join to keep the > > > retained state size small. Using joins for the first time here. > > It works > > > in unit tests but we are not able to get expected results in > > production. > > > We are still troubleshooting this issue. Can you please help us > > review > > > this in case we missed something or our assumptions are wrong? > > > > > > SELECT o.region_code, > > > concat_ws( > > > '/', > > > CAST(sum(CASE WHEN r.order_idIS NOT NULL AND > > c.order_idIS NULL THEN 1 ELSE 0 END)AS VARCHAR), > > > CAST(count(1)AS VARCHAR) > > > )AS offer_conversion_5m > > > FROM ( > > > SELECT region_code, > > > offer_id, > > > rowtime > > > FROM event_offer_created > > > WHERE ... > > > ) o > > > LEFT JOIN ( > > > SELECT offer_id, > > > order_id, > > > rowtime > > > FROM event_order_requested > > > WHERE ... > > > ) r > > > ON o.offer_id = r.offer_id > > > AND r.rowtimeBETWEEN o.rowtimeAND o.rowtime +INTERVAL '1' > hour > > > LEFT JOIN ( > > > SELECT order_id, > > > rowtime > > > FROM event_order_cancelled > > > WHERE ... > > > )c > > > ON r.order_id =c.order_id > > > AND c.rowtimeBETWEEN r.rowtimeAND r.rowtime +INTERVAL '1' > hour > > > GROUP BY > > > o.region_code, > > > TUMBLE(o.rowtime,INTERVAL '5' minute) > > > > > > > > > The sequence of events is: > > > > > > 1. At time X an offer is created (event stream = > > "*event_offer_created"*) > > > 2. At time Y that offer is used to create an order (event stream > = > > > "*event_order_requested*"). Left join because not all offers > > get used. > > > 3. At time Z that order is cancelled (event stream = > > > "*event_order_cancelled*"). Left join because not all orders > get > > > cancelled. > > > > > > "*offer_conversion_5m*" represents: number of converted orders / > > total > > > number of offerings" in a 5 minutes bucket. If an order gets > > cancelled > > > we don't want to count that. That's why we have [c.order_id IS > > NULL THEN > > > 1 ELSE 0 END] in the select. > > > > > > We picked 1 hour time windows because that's the maximum time we > > expect > > > the successive events to take for a given record chain. > > > > > > The outer GROUP BY is to get 5 minute aggregation for each > > "region". As > > > expected the watermark lags 2 hour from the current time because > > of the > > > two time-window joins above. The IdleStateRetentionTime is not > > set, so > > > the expectation is that the state will be retained as per the time > > > window size and as the records fall off the window the state will > be > > > cleaned up. The aggregated state is expected to be kept around > for 5 > > > minutes (GROUP BY). > > > > > > However, we are unable to see the conversion (offer_created -> > > > order_requested (without order_cancelled)). > > '*offer_conversion_5m*' is > > > always zero although we know the streams contain records that > should > > > have incremented the count. Any idea what could be wrong? Is the > > state > > > being dropped too early (5 mins) because of the outer 5 minute > > tumbling > > > window? > > > > > > Thanks, > > > Vinod > > > >