Thanks Timo for responding back! Answers below: > 1) Which planner are you using?
We are using Flink 1.8 and using the default planner (org.apache.flink.table.calcite.FlinkPlannerImpl) from: org.apache.flink:flink-table-planner_2.11:1.8 > 2) How do you create your watermarks? We are using periodic watermarking and have configured stream time characteristics as TimeCharacteristic.EventTime. The watermark assigner extracts the timestamp from time attributes from the event and keeps it 5 seconds behind the maximum timestamp seen in order to allow for stale events. > 3) Did you unit test with only parallelism of 1 or higher? I tried both 1 and higher values in tests and for all parallelism values the unit tests works as expected. 4) Can you share the output of TableEnvironment.explain() with us? Attached. Please note that I had obfuscated the query a bit in my original post for clarity. I have pasted the actual query along with the plan so that you can correlate it. > Shouldn't c have a rowtime constraint around o instead of r? Such that all time-based operations work on o.rowtime? I have tried both (and some more variations). Got the same results (unit tests passes but production execution doesn't join as expected). Here is the modified query: SELECT o.region_code, concat_ws( '/', CAST(sum(CASE WHEN r.order_id IS NOT NULL AND c.order_id IS NULL THEN 1 ELSE 0 END) AS VARCHAR), CAST(count(1) AS VARCHAR) ) AS offer_conversion_5m FROM ( SELECT region_code, offer_id, rowtime FROM event_offer_created WHERE ... ) o LEFT JOIN ( SELECT offer_id, order_id, rowtime FROM event_order_requested WHERE ... ) r ON o.offer_id = r.offer_id AND o.rowtime BETWEEN r.rowtime - INTERVAL '1' hour AND r.rowtime LEFT JOIN ( SELECT order_id, rowtime FROM event_order_cancelled WHERE ... ) c ON r.order_id = c.order_id AND o.rowtime BETWEEN c.rowtime - INTERVAL '2' hour AND c.rowtime GROUP BY o.region_code, TUMBLE(o.rowtime, INTERVAL '5' minute) We used minus two hours ("c.rowtime - INTERVAL '2' hour") in the 2nd time window because it is from the first table and 3rd one. -- Vinod On Fri, Mar 13, 2020 at 6:42 AM Timo Walther <twal...@apache.org> wrote: > Hi Vinod, > > I cannot spot any problems in your SQL query. > > Some questions for clarification: > 1) Which planner are you using? > 2) How do you create your watermarks? > 3) Did you unit test with only parallelism of 1 or higher? > 4) Can you share the output of TableEnvironment.explain() with us? > > Shouldn't c have a rowtime constraint around o instead of r? Such that > all time-based operations work on o.rowtime? > > Regards, > Timo > > > On 10.03.20 19:26, Vinod Mehra wrote: > > Hi! > > > > We are testing the following 3 way time windowed join to keep the > > retained state size small. Using joins for the first time here. It works > > in unit tests but we are not able to get expected results in production. > > We are still troubleshooting this issue. Can you please help us review > > this in case we missed something or our assumptions are wrong? > > > > SELECT o.region_code, > > concat_ws( > > '/', > > CAST(sum(CASE WHEN r.order_idIS NOT NULL AND c.order_idIS NULL > THEN 1 ELSE 0 END)AS VARCHAR), > > CAST(count(1)AS VARCHAR) > > )AS offer_conversion_5m > > FROM ( > > SELECT region_code, > > offer_id, > > rowtime > > FROM event_offer_created > > WHERE ... > > ) o > > LEFT JOIN ( > > SELECT offer_id, > > order_id, > > rowtime > > FROM event_order_requested > > WHERE ... > > ) r > > ON o.offer_id = r.offer_id > > AND r.rowtimeBETWEEN o.rowtimeAND o.rowtime +INTERVAL '1' hour > > LEFT JOIN ( > > SELECT order_id, > > rowtime > > FROM event_order_cancelled > > WHERE ... > > )c > > ON r.order_id =c.order_id > > AND c.rowtimeBETWEEN r.rowtimeAND r.rowtime +INTERVAL '1' hour > > GROUP BY > > o.region_code, > > TUMBLE(o.rowtime,INTERVAL '5' minute) > > > > > > The sequence of events is: > > > > 1. At time X an offer is created (event stream = > "*event_offer_created"*) > > 2. At time Y that offer is used to create an order (event stream = > > "*event_order_requested*"). Left join because not all offers get > used. > > 3. At time Z that order is cancelled (event stream = > > "*event_order_cancelled*"). Left join because not all orders get > > cancelled. > > > > "*offer_conversion_5m*" represents: number of converted orders / total > > number of offerings" in a 5 minutes bucket. If an order gets cancelled > > we don't want to count that. That's why we have [c.order_id IS NULL THEN > > 1 ELSE 0 END] in the select. > > > > We picked 1 hour time windows because that's the maximum time we expect > > the successive events to take for a given record chain. > > > > The outer GROUP BY is to get 5 minute aggregation for each "region". As > > expected the watermark lags 2 hour from the current time because of the > > two time-window joins above. The IdleStateRetentionTime is not set, so > > the expectation is that the state will be retained as per the time > > window size and as the records fall off the window the state will be > > cleaned up. The aggregated state is expected to be kept around for 5 > > minutes (GROUP BY). > > > > However, we are unable to see the conversion (offer_created -> > > order_requested (without order_cancelled)). '*offer_conversion_5m*' is > > always zero although we know the streams contain records that should > > have incremented the count. Any idea what could be wrong? Is the state > > being dropped too early (5 mins) because of the outer 5 minute tumbling > > window? > > > > Thanks, > > Vinod > >
SELECT o.region_code, concat_ws( '/', CAST(SUM(CASE WHEN r.ride_id IS NOT NULL AND c.ride_id IS NULL THEN 1 ELSE 0 END) AS VARCHAR), CAST(COUNT(1) AS VARCHAR) ) AS ss_session_conversion_region_5m, TUMBLE_END(o.rowtime, INTERVAL '5' minute) AS latency_marker FROM ( SELECT region_code, offer_id, rowtime FROM event_offerings_offer_created WHERE offer_product_id = 'courier_saver' AND destination_lat IS NOT NULL AND destination_lng IS NOT NULL ) o LEFT JOIN ( SELECT offer_id, ride_id, rowtime FROM event_ride_requested WHERE analytical_product_name = 'courier_saver' ) r ON o.offer_id = r.offer_id AND o.rowtime BETWEEN r.rowtime - INTERVAL '1' hour AND r.rowtime LEFT JOIN ( SELECT ride_id, rowtime FROM event_ride_canceled WHERE analytical_product_name = 'courier_saver' AND NOT after_accepted ) c ON r.ride_id = c.ride_id AND r.rowtime BETWEEN c.rowtime - INTERVAL '1' hour AND c.rowtime GROUP BY o.region_code, TUMBLE(o.rowtime, INTERVAL '5' minute) Explain plan: == Abstract Syntax Tree == LogicalProject(region_code=[$0], ss_session_conversion_region_5m=[CONCAT_WS(_UTF-16LE'/', CAST($2):VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" NOT NULL, CAST($3):VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" NOT NULL)], latency_marker=[TUMBLE_END($1)]) LogicalAggregate(group=[{0, 1}], agg#0=[SUM($2)], agg#1=[COUNT()]) LogicalProject(region_code=[$0], $f1=[TUMBLE($2, 300000)], $f2=[CASE(AND(IS NOT NULL($4), IS NULL($6)), 1, 0)], $f3=[1]) LogicalJoin(condition=[AND(=($4, $6), >=($5, -($7, 3600000)), <=($5, $7))], joinType=[left]) LogicalJoin(condition=[AND(=($1, $3), >=($2, -($5, 3600000)), <=($2, $5))], joinType=[left]) LogicalProject(region_code=[$5], offer_id=[$4], rowtime=[$6]) LogicalFilter(condition=[AND(=($3, _UTF-16LE'courier_saver'), IS NOT NULL($2), IS NOT NULL($1))]) LogicalTableScan(table=[[event_offerings_offer_created]]) LogicalProject(offer_id=[$3], ride_id=[$1], rowtime=[$4]) LogicalFilter(condition=[=($2, _UTF-16LE'courier_saver')]) LogicalTableScan(table=[[event_ride_requested]]) LogicalProject(ride_id=[$1], rowtime=[$4]) LogicalFilter(condition=[AND(=($3, _UTF-16LE'courier_saver'), NOT($2))]) LogicalTableScan(table=[[event_ride_canceled]]) == Optimized Logical Plan == DataStreamCalc(select=[region_code, CONCAT_WS(_UTF-16LE'/', CAST($f1), CAST($f2)) AS ss_session_conversion_region_5m, w$end AS latency_marker]) DataStreamGroupWindowAggregate(groupBy=[region_code], window=[TumblingGroupWindow('w$, 'rowtime, 300000.millis)], select=[region_code, $SUM0($f2) AS $f1, COUNT(*) AS $f2, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) DataStreamCalc(select=[region_code, rowtime, CASE(AND(IS NOT NULL(ride_id), IS NULL(ride_id0)), 1, 0) AS $f2, 1 AS $f3]) DataStreamWindowJoin(where=[AND(=(ride_id, ride_id0), >=(CAST(rowtime0), -(CAST(rowtime1), 3600000)), <=(CAST(rowtime0), CAST(rowtime1)))], join=[region_code, rowtime, ride_id, rowtime0, ride_id0, rowtime1], joinType=[LeftOuterJoin]) DataStreamCalc(select=[region_code, rowtime, ride_id, rowtime0]) DataStreamWindowJoin(where=[AND(=(offer_id, offer_id0), >=(CAST(rowtime), -(CAST(rowtime0), 3600000)), <=(CAST(rowtime), CAST(rowtime0)))], join=[region_code, offer_id, rowtime, offer_id0, ride_id, rowtime0], joinType=[LeftOuterJoin]) DataStreamCalc(select=[region_code, offer_id, rowtime], where=[AND(=(offer_product_id, _UTF-16LE'courier_saver'), IS NOT NULL(destination_lat), IS NOT NULL(destination_lng))]) DataStreamScan(table=[[event_offerings_offer_created]]) DataStreamCalc(select=[offer_id, ride_id, rowtime], where=[=(analytical_product_name, _UTF-16LE'courier_saver')]) DataStreamScan(table=[[event_ride_requested]]) DataStreamCalc(select=[ride_id, rowtime], where=[AND(=(analytical_product_name, _UTF-16LE'courier_saver'), NOT(after_accepted))]) DataStreamScan(table=[[event_ride_canceled]]) == Physical Execution Plan == Stage 1 : Data Source content : collect elements with CollectionInputFormat Stage 2 : Operator content : Timestamps/Watermarks ship_strategy : FORWARD Stage 3 : Operator content : extract_event_name ship_strategy : FORWARD Stage 4 : Operator content : group_counter_ss_session_conversion_region_5m.1.rawKinesisInput ship_strategy : FORWARD Stage 5 : Operator content : sample_with_formatter ship_strategy : FORWARD Stage 6 : Operator content : only_offerings_offer_created ship_strategy : FORWARD Stage 7 : Operator content : parse_offerings_offer_created ship_strategy : FORWARD Stage 9 : Operator content : Timestamps/Watermarks ship_strategy : SHUFFLE Stage 10 : Operator content : group_counter_event_offerings_offer_created.kinesis_records ship_strategy : FORWARD Stage 11 : Operator content : sample_with_formatter ship_strategy : FORWARD Stage 12 : Operator content : Process ship_strategy : FORWARD Stage 13 : Operator content : only_ride_requested ship_strategy : FORWARD Stage 14 : Operator content : parse_ride_requested ship_strategy : FORWARD Stage 16 : Operator content : Timestamps/Watermarks ship_strategy : SHUFFLE Stage 17 : Operator content : group_counter_event_ride_requested.kinesis_records ship_strategy : FORWARD Stage 18 : Operator content : sample_with_formatter ship_strategy : FORWARD Stage 19 : Operator content : Process ship_strategy : FORWARD Stage 20 : Operator content : only_ride_canceled ship_strategy : FORWARD Stage 21 : Operator content : parse_ride_canceled ship_strategy : FORWARD Stage 23 : Operator content : Timestamps/Watermarks ship_strategy : SHUFFLE Stage 24 : Operator content : group_counter_event_ride_canceled.kinesis_records ship_strategy : FORWARD Stage 25 : Operator content : sample_with_formatter ship_strategy : FORWARD Stage 26 : Operator content : Process ship_strategy : FORWARD Stage 27 : Operator content : from: (occurred_at, destination_lng, destination_lat, offer_product_id, offer_id, region_code, rowtime) ship_strategy : FORWARD Stage 28 : Operator content : where: (AND(=(offer_product_id, _UTF-16LE'courier_saver'), IS NOT NULL(destination_lat), IS NOT NULL(destination_lng))), select: (region_code, offer_id, rowtime) ship_strategy : FORWARD Stage 29 : Operator content : from: (occurred_at, ride_id, analytical_product_name, offer_id, rowtime) ship_strategy : FORWARD Stage 30 : Operator content : where: (=(analytical_product_name, _UTF-16LE'courier_saver')), select: (offer_id, ride_id, rowtime) ship_strategy : FORWARD Stage 33 : Operator content : where: (AND(=(offer_id, offer_id0), >=(CAST(rowtime), -(CAST(rowtime0), 3600000)), <=(CAST(rowtime), CAST(rowtime0)))), join: (region_code, offer_id, rowtime, offer_id0, ride_id, rowtime0) ship_strategy : HASH Stage 34 : Operator content : select: (region_code, rowtime, ride_id, rowtime0) ship_strategy : FORWARD Stage 35 : Operator content : from: (occurred_at, ride_id, after_accepted, analytical_product_name, rowtime) ship_strategy : FORWARD Stage 36 : Operator content : where: (AND(=(analytical_product_name, _UTF-16LE'courier_saver'), NOT(after_accepted))), select: (ride_id, rowtime) ship_strategy : FORWARD Stage 39 : Operator content : where: (AND(=(ride_id, ride_id0), >=(CAST(rowtime0), -(CAST(rowtime1), 3600000)), <=(CAST(rowtime0), CAST(rowtime1)))), join: (region_code, rowtime, ride_id, rowtime0, ride_id0, rowtime1) ship_strategy : HASH Stage 40 : Operator content : select: (region_code, rowtime, CASE(AND(IS NOT NULL(ride_id), IS NULL(ride_id0)), 1, 0) AS $f2, 1 AS $f3) ship_strategy : FORWARD Stage 41 : Operator content : time attribute: (rowtime) ship_strategy : FORWARD Stage 43 : Operator content : groupBy: (region_code), window: (TumblingGroupWindow('w$, 'rowtime, 300000.millis)), select: (region_code, $SUM0($f2) AS $f1, COUNT(*) AS $f2, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime) ship_strategy : HASH Stage 44 : Operator content : select: (region_code, CONCAT_WS(_UTF-16LE'/', CAST($f1), CAST($f2)) AS ss_session_conversion_region_5m, w$end AS latency_marker) ship_strategy : FORWARD Stage 53 : Operator content : from: (occurred_at, destination_lng, destination_lat, offer_product_id, offer_id, region_code, rowtime) ship_strategy : FORWARD Stage 54 : Operator content : where: (AND(=(offer_product_id, _UTF-16LE'courier_saver'), IS NOT NULL(destination_lat), IS NOT NULL(destination_lng))), select: (region_code, offer_id, rowtime) ship_strategy : FORWARD Stage 55 : Operator content : from: (occurred_at, ride_id, analytical_product_name, offer_id, rowtime) ship_strategy : FORWARD Stage 56 : Operator content : where: (=(analytical_product_name, _UTF-16LE'courier_saver')), select: (offer_id, ride_id, rowtime) ship_strategy : FORWARD Stage 59 : Operator content : where: (AND(=(offer_id, offer_id0), >=(CAST(rowtime), -(CAST(rowtime0), 3600000)), <=(CAST(rowtime), CAST(rowtime0)))), join: (region_code, offer_id, rowtime, offer_id0, ride_id, rowtime0) ship_strategy : HASH Stage 60 : Operator content : select: (region_code, rowtime, ride_id, rowtime0) ship_strategy : FORWARD Stage 61 : Operator content : from: (occurred_at, ride_id, after_accepted, analytical_product_name, rowtime) ship_strategy : FORWARD Stage 62 : Operator content : where: (AND(=(analytical_product_name, _UTF-16LE'courier_saver'), NOT(after_accepted))), select: (ride_id, rowtime) ship_strategy : FORWARD Stage 65 : Operator content : where: (AND(=(ride_id, ride_id0), >=(CAST(rowtime0), -(CAST(rowtime1), 3600000)), <=(CAST(rowtime0), CAST(rowtime1)))), join: (region_code, rowtime, ride_id, rowtime0, ride_id0, rowtime1) ship_strategy : HASH Stage 66 : Operator content : select: (region_code, rowtime, CASE(AND(IS NOT NULL(ride_id), IS NULL(ride_id0)), 1, 0) AS $f2, 1 AS $f3) ship_strategy : FORWARD Stage 67 : Operator content : time attribute: (rowtime) ship_strategy : FORWARD Stage 69 : Operator content : groupBy: (region_code), window: (TumblingGroupWindow('w$, 'rowtime, 300000.millis)), select: (region_code, $SUM0($f2) AS $f1, COUNT(*) AS $f2, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime) ship_strategy : HASH Stage 70 : Operator content : select: (region_code, CONCAT_WS(_UTF-16LE'/', CAST($f1), CAST($f2)) AS ss_session_conversion_region_5m, w$end AS latency_marker) ship_strategy : FORWARD