Another interesting thing is that if I add DISTINCT in the 2nd query it doesn't complain. But because of the inner-select it is a no-op because the inner select is doing the deduping:
SELECT CONCAT_WS( '-', CAST(MONTH(row_datetime) AS VARCHAR), CAST(YEAR(row_datetime) AS VARCHAR), CAST(user_id AS VARCHAR) ), COUNT(*DISTINCT*(event_id)) AS event_count -- note the DISTINCT keyword here. Flink doesn't barf for this. FROM ( SELECT user_id, event_id, maxtimestamp(longToDateTime(rowtime)) as row_datetime FROM event_foo GROUP BY event_id, user_id ) GROUP BY user_id, MONTH(row_datetime), YEAR(row_datetime) On Wed, May 29, 2019 at 5:15 PM Vinod Mehra <vme...@lyft.com> wrote: > More details on the error with query#1 that used COUNT(DISTINCT()): > > org.apache.flink.table.api.TableException: Cannot generate a valid > execution plan for the given query: > > FlinkLogicalCalc(expr#0..8=[{inputs}], expr#9=[_UTF-16LE'-'], > expr#10=[CAST($t1):VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE > "ISO-8859-1$en_US$primary"], expr#11=[CAST($t2):VARCHAR(65536) CHARACTER > SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary"], > expr#12=[CAST($t0):VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE > "ISO-8859-1$en_US$primary"], expr#13=[CONCAT_WS($t9, $t10, $t11, $t12)], > EXPR$0=[$t13], mastercard_world_elite_monthly_rides_encoded=[$t8], > lower_boundary=[$t3], latency_marker=[$t4]) > FlinkLogicalJoin(condition=[AND(IS NOT DISTINCT FROM($0, $5), IS NOT > DISTINCT FROM($1, $6), IS NOT DISTINCT FROM($2, $7))], joinType=[inner]) > FlinkLogicalAggregate(group=[{0, 1, 2}], > lower_boundary=[mintimestamp($4)], latency_marker=[maxtimestamp($4)]) > FlinkLogicalCalc(expr#0..6=[{inputs}], expr#7=[FLAG(MONTH)], > expr#8=[CAST($t6):TIMESTAMP(3) NOT NULL], expr#9=[longToDateTime($t8)], > expr#10=[Reinterpret($t9)], expr#11=[86400000], expr#12=[/INT($t10, $t11)], > expr#13=[EXTRACT_DATE($t7, $t12)], expr#14=[FLAG(YEAR)], > expr#15=[EXTRACT_DATE($t14, $t12)], expr#16=[_UTF-16LE'USD'], > expr#17=[=($t4, $t16)], expr#18=[_UTF-16LE'MWE'], expr#19=[=($t2, $t18)], > expr#20=[NOT($t5)], expr#21=[AND($t17, $t19, $t20)], user_lyft_id=[$t3], > $f1=[$t13], $f2=[$t15], ride_id=[$t1], $f4=[$t9], $condition=[$t21]) > > FlinkLogicalNativeTableScan(table=[[event_pay_passenger_payment_succeeded_for_rewards]]) > FlinkLogicalAggregate(group=[{0, 1, 2}], > mastercard_world_elite_monthly_rides_encoded=[COUNT($3)]) > FlinkLogicalAggregate(group=[{0, 1, 2, 3}]) > FlinkLogicalCalc(expr#0..6=[{inputs}], expr#7=[FLAG(MONTH)], > expr#8=[CAST($t6):TIMESTAMP(3) NOT NULL], expr#9=[longToDateTime($t8)], > expr#10=[Reinterpret($t9)], expr#11=[86400000], expr#12=[/INT($t10, $t11)], > expr#13=[EXTRACT_DATE($t7, $t12)], expr#14=[FLAG(YEAR)], > expr#15=[EXTRACT_DATE($t14, $t12)], expr#16=[_UTF-16LE'USD'], > expr#17=[=($t4, $t16)], expr#18=[_UTF-16LE'MWE'], expr#19=[=($t2, $t18)], > expr#20=[NOT($t5)], expr#21=[AND($t17, $t19, $t20)], user_lyft_id=[$t3], > $f1=[$t13], $f2=[$t15], ride_id=[$t1], $condition=[$t21]) > > FlinkLogicalNativeTableScan(table=[[event_pay_passenger_payment_succeeded_for_rewards]]) > > This exception indicates that the query uses an unsupported SQL feature. > Please check the documentation for the set of currently supported SQL > features. > > at > org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:274) > at > org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:683) > at > org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:730) > at > org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:414) > > > > On Wed, May 29, 2019 at 1:49 PM Vinod Mehra <vme...@lyft.com> wrote: > >> Hi! >> >> We are using apache-flink-1.4.2. It seems this version doesn't support >> count(DISTINCT). I am trying to find a way to dedup the stream. So I tried: >> >> SELECT >> >> CONCAT_WS( >> >> '-', >> >> CAST(MONTH(longToDateTime(rowtime)) AS VARCHAR), >> >> CAST(YEAR(longToDateTime(rowtime)) AS VARCHAR), >> >> CAST(user_id AS VARCHAR) >> >> ), >> >> COUNT(*DISTINCT*(event_id)) AS event_count >> >> FROM event_foo >> >> GROUP BY user_id, MONTH(longToDateTime(rowtime)), >> YEAR(longToDateTime(rowtime)) >> >> >> (the duplicate events have the same 'event_id' (and user_id), the other >> fields e.g. timestamps may or may not be different) >> >> >> But that failed because DISTINCT is not supported. As a workaround I >> tried: >> >> SELECT >> >> CONCAT_WS( >> >> '-', >> >> CAST(MONTH(row_datetime) AS VARCHAR), >> >> CAST(YEAR(row_datetime) AS VARCHAR), >> >> CAST(user_id AS VARCHAR) >> >> ), >> >> COUNT(event_id) AS event_count >> >> FROM ( >> >> SELECT >> >> user_id, >> >> event_id, >> >> maxtimestamp(longToDateTime(rowtime)) as row_datetime >> >> FROM event_foo >> >> GROUP BY event_id, user_id >> >> ) >> >> GROUP BY user_id, MONTH(row_datetime), YEAR(row_datetime) >> >> I am hoping the inner SELECT to do the deduping because logically it is >> equivalent to a DISTINCT. This works in my functional testing. >> >> Will it also work if the dedups span different event buckets? I was >> hoping that as long as the events arrive within the state "retention time" >> in flink they should be deduped but I am new to Flink so I am not sure >> about that. Can someone please correct me if I am wrong? Is this a >> reasonable workaround for lack of DISTINCT support? Please let me know if >> there is a better way. >> >> Thanks, >> Vinod >> >>