More details on the error with query#1 that used COUNT(DISTINCT()): org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query:
FlinkLogicalCalc(expr#0..8=[{inputs}], expr#9=[_UTF-16LE'-'], expr#10=[CAST($t1):VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary"], expr#11=[CAST($t2):VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary"], expr#12=[CAST($t0):VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary"], expr#13=[CONCAT_WS($t9, $t10, $t11, $t12)], EXPR$0=[$t13], mastercard_world_elite_monthly_rides_encoded=[$t8], lower_boundary=[$t3], latency_marker=[$t4]) FlinkLogicalJoin(condition=[AND(IS NOT DISTINCT FROM($0, $5), IS NOT DISTINCT FROM($1, $6), IS NOT DISTINCT FROM($2, $7))], joinType=[inner]) FlinkLogicalAggregate(group=[{0, 1, 2}], lower_boundary=[mintimestamp($4)], latency_marker=[maxtimestamp($4)]) FlinkLogicalCalc(expr#0..6=[{inputs}], expr#7=[FLAG(MONTH)], expr#8=[CAST($t6):TIMESTAMP(3) NOT NULL], expr#9=[longToDateTime($t8)], expr#10=[Reinterpret($t9)], expr#11=[86400000], expr#12=[/INT($t10, $t11)], expr#13=[EXTRACT_DATE($t7, $t12)], expr#14=[FLAG(YEAR)], expr#15=[EXTRACT_DATE($t14, $t12)], expr#16=[_UTF-16LE'USD'], expr#17=[=($t4, $t16)], expr#18=[_UTF-16LE'MWE'], expr#19=[=($t2, $t18)], expr#20=[NOT($t5)], expr#21=[AND($t17, $t19, $t20)], user_lyft_id=[$t3], $f1=[$t13], $f2=[$t15], ride_id=[$t1], $f4=[$t9], $condition=[$t21]) FlinkLogicalNativeTableScan(table=[[event_pay_passenger_payment_succeeded_for_rewards]]) FlinkLogicalAggregate(group=[{0, 1, 2}], mastercard_world_elite_monthly_rides_encoded=[COUNT($3)]) FlinkLogicalAggregate(group=[{0, 1, 2, 3}]) FlinkLogicalCalc(expr#0..6=[{inputs}], expr#7=[FLAG(MONTH)], expr#8=[CAST($t6):TIMESTAMP(3) NOT NULL], expr#9=[longToDateTime($t8)], expr#10=[Reinterpret($t9)], expr#11=[86400000], expr#12=[/INT($t10, $t11)], expr#13=[EXTRACT_DATE($t7, $t12)], expr#14=[FLAG(YEAR)], expr#15=[EXTRACT_DATE($t14, $t12)], expr#16=[_UTF-16LE'USD'], expr#17=[=($t4, $t16)], expr#18=[_UTF-16LE'MWE'], expr#19=[=($t2, $t18)], expr#20=[NOT($t5)], expr#21=[AND($t17, $t19, $t20)], user_lyft_id=[$t3], $f1=[$t13], $f2=[$t15], ride_id=[$t1], $condition=[$t21]) FlinkLogicalNativeTableScan(table=[[event_pay_passenger_payment_succeeded_for_rewards]]) This exception indicates that the query uses an unsupported SQL feature. Please check the documentation for the set of currently supported SQL features. at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:274) at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:683) at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:730) at org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:414) On Wed, May 29, 2019 at 1:49 PM Vinod Mehra <vme...@lyft.com> wrote: > Hi! > > We are using apache-flink-1.4.2. It seems this version doesn't support > count(DISTINCT). I am trying to find a way to dedup the stream. So I tried: > > SELECT > > CONCAT_WS( > > '-', > > CAST(MONTH(longToDateTime(rowtime)) AS VARCHAR), > > CAST(YEAR(longToDateTime(rowtime)) AS VARCHAR), > > CAST(user_id AS VARCHAR) > > ), > > COUNT(*DISTINCT*(event_id)) AS event_count > > FROM event_foo > > GROUP BY user_id, MONTH(longToDateTime(rowtime)), > YEAR(longToDateTime(rowtime)) > > > (the duplicate events have the same 'event_id' (and user_id), the other > fields e.g. timestamps may or may not be different) > > > But that failed because DISTINCT is not supported. As a workaround I tried: > > SELECT > > CONCAT_WS( > > '-', > > CAST(MONTH(row_datetime) AS VARCHAR), > > CAST(YEAR(row_datetime) AS VARCHAR), > > CAST(user_id AS VARCHAR) > > ), > > COUNT(event_id) AS event_count > > FROM ( > > SELECT > > user_id, > > event_id, > > maxtimestamp(longToDateTime(rowtime)) as row_datetime > > FROM event_foo > > GROUP BY event_id, user_id > > ) > > GROUP BY user_id, MONTH(row_datetime), YEAR(row_datetime) > > I am hoping the inner SELECT to do the deduping because logically it is > equivalent to a DISTINCT. This works in my functional testing. > > Will it also work if the dedups span different event buckets? I was hoping > that as long as the events arrive within the state "retention time" in > flink they should be deduped but I am new to Flink so I am not sure about > that. Can someone please correct me if I am wrong? Is this a reasonable > workaround for lack of DISTINCT support? Please let me know if there is a > better way. > > Thanks, > Vinod > >