More details on the error with query#1 that used COUNT(DISTINCT()):

org.apache.flink.table.api.TableException: Cannot generate a valid
execution plan for the given query:

FlinkLogicalCalc(expr#0..8=[{inputs}], expr#9=[_UTF-16LE'-'],
expr#10=[CAST($t1):VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE
"ISO-8859-1$en_US$primary"], expr#11=[CAST($t2):VARCHAR(65536) CHARACTER
SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary"],
expr#12=[CAST($t0):VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE
"ISO-8859-1$en_US$primary"], expr#13=[CONCAT_WS($t9, $t10, $t11, $t12)],
EXPR$0=[$t13], mastercard_world_elite_monthly_rides_encoded=[$t8],
lower_boundary=[$t3], latency_marker=[$t4])
  FlinkLogicalJoin(condition=[AND(IS NOT DISTINCT FROM($0, $5), IS NOT
DISTINCT FROM($1, $6), IS NOT DISTINCT FROM($2, $7))], joinType=[inner])
    FlinkLogicalAggregate(group=[{0, 1, 2}],
lower_boundary=[mintimestamp($4)], latency_marker=[maxtimestamp($4)])
      FlinkLogicalCalc(expr#0..6=[{inputs}], expr#7=[FLAG(MONTH)],
expr#8=[CAST($t6):TIMESTAMP(3) NOT NULL], expr#9=[longToDateTime($t8)],
expr#10=[Reinterpret($t9)], expr#11=[86400000], expr#12=[/INT($t10, $t11)],
expr#13=[EXTRACT_DATE($t7, $t12)], expr#14=[FLAG(YEAR)],
expr#15=[EXTRACT_DATE($t14, $t12)], expr#16=[_UTF-16LE'USD'],
expr#17=[=($t4, $t16)], expr#18=[_UTF-16LE'MWE'], expr#19=[=($t2, $t18)],
expr#20=[NOT($t5)], expr#21=[AND($t17, $t19, $t20)], user_lyft_id=[$t3],
$f1=[$t13], $f2=[$t15], ride_id=[$t1], $f4=[$t9], $condition=[$t21])

FlinkLogicalNativeTableScan(table=[[event_pay_passenger_payment_succeeded_for_rewards]])
    FlinkLogicalAggregate(group=[{0, 1, 2}],
mastercard_world_elite_monthly_rides_encoded=[COUNT($3)])
      FlinkLogicalAggregate(group=[{0, 1, 2, 3}])
        FlinkLogicalCalc(expr#0..6=[{inputs}], expr#7=[FLAG(MONTH)],
expr#8=[CAST($t6):TIMESTAMP(3) NOT NULL], expr#9=[longToDateTime($t8)],
expr#10=[Reinterpret($t9)], expr#11=[86400000], expr#12=[/INT($t10, $t11)],
expr#13=[EXTRACT_DATE($t7, $t12)], expr#14=[FLAG(YEAR)],
expr#15=[EXTRACT_DATE($t14, $t12)], expr#16=[_UTF-16LE'USD'],
expr#17=[=($t4, $t16)], expr#18=[_UTF-16LE'MWE'], expr#19=[=($t2, $t18)],
expr#20=[NOT($t5)], expr#21=[AND($t17, $t19, $t20)], user_lyft_id=[$t3],
$f1=[$t13], $f2=[$t15], ride_id=[$t1], $condition=[$t21])

FlinkLogicalNativeTableScan(table=[[event_pay_passenger_payment_succeeded_for_rewards]])

This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL
features.

at
org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:274)
at
org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:683)
at
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:730)
at
org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:414)



On Wed, May 29, 2019 at 1:49 PM Vinod Mehra <vme...@lyft.com> wrote:

> Hi!
>
> We are using apache-flink-1.4.2. It seems this version doesn't support
> count(DISTINCT). I am trying to find a way to dedup the stream. So I tried:
>
> SELECT
>
>     CONCAT_WS(
>
>       '-',
>
>       CAST(MONTH(longToDateTime(rowtime)) AS VARCHAR),
>
>       CAST(YEAR(longToDateTime(rowtime)) AS VARCHAR),
>
>       CAST(user_id AS VARCHAR)
>
>     ),
>
>     COUNT(*DISTINCT*(event_id)) AS event_count
>
> FROM event_foo
>
> GROUP BY user_id, MONTH(longToDateTime(rowtime)),
> YEAR(longToDateTime(rowtime))
>
>
> (the duplicate events have the same 'event_id' (and user_id), the other
> fields e.g. timestamps may or may not be different)
>
>
> But that failed because DISTINCT is not supported. As a workaround I tried:
>
> SELECT
>
>     CONCAT_WS(
>
>       '-',
>
>       CAST(MONTH(row_datetime) AS VARCHAR),
>
>       CAST(YEAR(row_datetime) AS VARCHAR),
>
>       CAST(user_id AS VARCHAR)
>
>     ),
>
>     COUNT(event_id) AS event_count
>
> FROM (
>
>     SELECT
>
>         user_id,
>
>         event_id,
>
>         maxtimestamp(longToDateTime(rowtime)) as row_datetime
>
>     FROM event_foo
>
>     GROUP BY event_id, user_id
>
> )
>
> GROUP BY user_id, MONTH(row_datetime), YEAR(row_datetime)
>
> I am hoping the inner SELECT to do the deduping because logically it is
> equivalent to a DISTINCT. This works in my functional testing.
>
> Will it also work if the dedups span different event buckets? I was hoping
> that as long as the events arrive within the state "retention time" in
> flink they should be deduped but I am new to Flink so I am not sure about
> that. Can someone please correct me if I am wrong? Is this a reasonable
> workaround for lack of DISTINCT support? Please let me know if there is a
> better way.
>
> Thanks,
> Vinod
>
>

Reply via email to