To be clear I want the outer grouping to have a longer retention time (of the order of week or month - for which we are using 'idle state retention time') and inner grouping to have a shorter retention period (1 hour max). So hoping the session window will do the right thing.
Thanks, Vinod On Tue, Jun 4, 2019 at 5:14 PM Vinod Mehra <vme...@lyft.com> wrote: > Thanks a lot Fabian for the detailed response. I know all the duplicates > are going to arrive within an hour max of the actual event. So using a 1 > hour running session window should be fine for me. > > Is the following the right way to do it in apache-flink-1.4.2? > > SELECT > > CONCAT_WS( > > '-', > > CAST(event_month AS VARCHAR), > > CAST(event_year AS VARCHAR), > > CAST(user_id AS VARCHAR) > > ), > > COUNT(event_id) AS event_count > > FROM ( > > SELECT > > user_id, > > event_id, > > MAX(MONTH(longToDateTime(rowtime))) as event_month, > > MAX(YEAR(longToDateTime(rowtime))) as event_year, > > FROM event_foo > > GROUP BY event_id, user_id, SESSION(rowtime, INTERVAL '1' HOUR) -- 1 > hour running session window > > ) > > GROUP BY user_id, event_month, event_year > > > > We are also using idle state retention time to clean up unused state, but > that is much longer (a week or month depending on the usecase). We will > switch to count(DISTINCT) as soon as we move to newer Flink version. So the > above nested select is going to be a stop gap until then. > > Thanks, > Vinod > > On Mon, Jun 3, 2019 at 4:52 AM Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi Vinod, >> >> IIRC, support for DISTINCT aggregates was added in Flink 1.6.0 (released >> August, 9th 2018) [1]. >> Also note that by default, this query will accumulate more and more >> state, i.e., for each grouping key it will hold all unique event_ids. >> You could configure an idle state retention time to clean up unused state. >> >> Regarding the boundaries, with the current query they are fixed to one >> month and sharply cut (as one would expect). >> You could try to use a long running session window [3]. This would also >> remove the need for the idle state configuration because Flink would know >> when state can be discarded. >> >> Hope this helps, >> Fabian >> >> [1] >> https://flink.apache.org/news/2018/08/09/release-1.6.0.html#enhancing-sql-and-table-api >> [2] >> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/query_configuration.html#idle-state-retention-time >> [3] >> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sql.html#group-windows >> >> Am Do., 30. Mai 2019 um 02:18 Uhr schrieb Vinod Mehra <vme...@lyft.com>: >> >>> Another interesting thing is that if I add DISTINCT in the 2nd query it >>> doesn't complain. But because of the inner-select it is a no-op because the >>> inner select is doing the deduping: >>> >>> SELECT >>> >>> CONCAT_WS( >>> >>> '-', >>> >>> CAST(MONTH(row_datetime) AS VARCHAR), >>> >>> CAST(YEAR(row_datetime) AS VARCHAR), >>> >>> CAST(user_id AS VARCHAR) >>> >>> ), >>> >>> COUNT(*DISTINCT*(event_id)) AS event_count -- note the DISTINCT >>> keyword here. Flink doesn't barf for this. >>> >>> FROM ( >>> >>> SELECT >>> >>> user_id, >>> >>> event_id, >>> >>> maxtimestamp(longToDateTime(rowtime)) as row_datetime >>> >>> FROM event_foo >>> >>> GROUP BY event_id, user_id >>> >>> ) >>> >>> GROUP BY user_id, MONTH(row_datetime), YEAR(row_datetime) >>> >>> On Wed, May 29, 2019 at 5:15 PM Vinod Mehra <vme...@lyft.com> wrote: >>> >>>> More details on the error with query#1 that used COUNT(DISTINCT()): >>>> >>>> org.apache.flink.table.api.TableException: Cannot generate a valid >>>> execution plan for the given query: >>>> >>>> FlinkLogicalCalc(expr#0..8=[{inputs}], expr#9=[_UTF-16LE'-'], >>>> expr#10=[CAST($t1):VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE >>>> "ISO-8859-1$en_US$primary"], expr#11=[CAST($t2):VARCHAR(65536) CHARACTER >>>> SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary"], >>>> expr#12=[CAST($t0):VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE >>>> "ISO-8859-1$en_US$primary"], expr#13=[CONCAT_WS($t9, $t10, $t11, $t12)], >>>> EXPR$0=[$t13], mastercard_world_elite_monthly_rides_encoded=[$t8], >>>> lower_boundary=[$t3], latency_marker=[$t4]) >>>> FlinkLogicalJoin(condition=[AND(IS NOT DISTINCT FROM($0, $5), IS NOT >>>> DISTINCT FROM($1, $6), IS NOT DISTINCT FROM($2, $7))], joinType=[inner]) >>>> FlinkLogicalAggregate(group=[{0, 1, 2}], >>>> lower_boundary=[mintimestamp($4)], latency_marker=[maxtimestamp($4)]) >>>> FlinkLogicalCalc(expr#0..6=[{inputs}], expr#7=[FLAG(MONTH)], >>>> expr#8=[CAST($t6):TIMESTAMP(3) NOT NULL], expr#9=[longToDateTime($t8)], >>>> expr#10=[Reinterpret($t9)], expr#11=[86400000], expr#12=[/INT($t10, $t11)], >>>> expr#13=[EXTRACT_DATE($t7, $t12)], expr#14=[FLAG(YEAR)], >>>> expr#15=[EXTRACT_DATE($t14, $t12)], expr#16=[_UTF-16LE'USD'], >>>> expr#17=[=($t4, $t16)], expr#18=[_UTF-16LE'MWE'], expr#19=[=($t2, $t18)], >>>> expr#20=[NOT($t5)], expr#21=[AND($t17, $t19, $t20)], user_lyft_id=[$t3], >>>> $f1=[$t13], $f2=[$t15], ride_id=[$t1], $f4=[$t9], $condition=[$t21]) >>>> >>>> FlinkLogicalNativeTableScan(table=[[event_pay_passenger_payment_succeeded_for_rewards]]) >>>> FlinkLogicalAggregate(group=[{0, 1, 2}], >>>> mastercard_world_elite_monthly_rides_encoded=[COUNT($3)]) >>>> FlinkLogicalAggregate(group=[{0, 1, 2, 3}]) >>>> FlinkLogicalCalc(expr#0..6=[{inputs}], expr#7=[FLAG(MONTH)], >>>> expr#8=[CAST($t6):TIMESTAMP(3) NOT NULL], expr#9=[longToDateTime($t8)], >>>> expr#10=[Reinterpret($t9)], expr#11=[86400000], expr#12=[/INT($t10, $t11)], >>>> expr#13=[EXTRACT_DATE($t7, $t12)], expr#14=[FLAG(YEAR)], >>>> expr#15=[EXTRACT_DATE($t14, $t12)], expr#16=[_UTF-16LE'USD'], >>>> expr#17=[=($t4, $t16)], expr#18=[_UTF-16LE'MWE'], expr#19=[=($t2, $t18)], >>>> expr#20=[NOT($t5)], expr#21=[AND($t17, $t19, $t20)], user_lyft_id=[$t3], >>>> $f1=[$t13], $f2=[$t15], ride_id=[$t1], $condition=[$t21]) >>>> >>>> FlinkLogicalNativeTableScan(table=[[event_pay_passenger_payment_succeeded_for_rewards]]) >>>> >>>> This exception indicates that the query uses an unsupported SQL feature. >>>> Please check the documentation for the set of currently supported SQL >>>> features. >>>> >>>> at >>>> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:274) >>>> at >>>> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:683) >>>> at >>>> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:730) >>>> at >>>> org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:414) >>>> >>>> >>>> >>>> On Wed, May 29, 2019 at 1:49 PM Vinod Mehra <vme...@lyft.com> wrote: >>>> >>>>> Hi! >>>>> >>>>> We are using apache-flink-1.4.2. It seems this version doesn't support >>>>> count(DISTINCT). I am trying to find a way to dedup the stream. So I >>>>> tried: >>>>> >>>>> SELECT >>>>> >>>>> CONCAT_WS( >>>>> >>>>> '-', >>>>> >>>>> CAST(MONTH(longToDateTime(rowtime)) AS VARCHAR), >>>>> >>>>> CAST(YEAR(longToDateTime(rowtime)) AS VARCHAR), >>>>> >>>>> CAST(user_id AS VARCHAR) >>>>> >>>>> ), >>>>> >>>>> COUNT(*DISTINCT*(event_id)) AS event_count >>>>> >>>>> FROM event_foo >>>>> >>>>> GROUP BY user_id, MONTH(longToDateTime(rowtime)), >>>>> YEAR(longToDateTime(rowtime)) >>>>> >>>>> >>>>> (the duplicate events have the same 'event_id' (and user_id), the >>>>> other fields e.g. timestamps may or may not be different) >>>>> >>>>> >>>>> But that failed because DISTINCT is not supported. As a workaround I >>>>> tried: >>>>> >>>>> SELECT >>>>> >>>>> CONCAT_WS( >>>>> >>>>> '-', >>>>> >>>>> CAST(MONTH(row_datetime) AS VARCHAR), >>>>> >>>>> CAST(YEAR(row_datetime) AS VARCHAR), >>>>> >>>>> CAST(user_id AS VARCHAR) >>>>> >>>>> ), >>>>> >>>>> COUNT(event_id) AS event_count >>>>> >>>>> FROM ( >>>>> >>>>> SELECT >>>>> >>>>> user_id, >>>>> >>>>> event_id, >>>>> >>>>> maxtimestamp(longToDateTime(rowtime)) as row_datetime >>>>> >>>>> FROM event_foo >>>>> >>>>> GROUP BY event_id, user_id >>>>> >>>>> ) >>>>> >>>>> GROUP BY user_id, MONTH(row_datetime), YEAR(row_datetime) >>>>> >>>>> I am hoping the inner SELECT to do the deduping because logically it >>>>> is equivalent to a DISTINCT. This works in my functional testing. >>>>> >>>>> Will it also work if the dedups span different event buckets? I was >>>>> hoping that as long as the events arrive within the state "retention time" >>>>> in flink they should be deduped but I am new to Flink so I am not sure >>>>> about that. Can someone please correct me if I am wrong? Is this a >>>>> reasonable workaround for lack of DISTINCT support? Please let me know if >>>>> there is a better way. >>>>> >>>>> Thanks, >>>>> Vinod >>>>> >>>>>