Another interesting thing is that if I add DISTINCT in the 2nd query it
doesn't complain. But because of the inner-select it is a no-op because the
inner select is doing the deduping:

SELECT

    CONCAT_WS(

      '-',

      CAST(MONTH(row_datetime) AS VARCHAR),

      CAST(YEAR(row_datetime) AS VARCHAR),

      CAST(user_id AS VARCHAR)

    ),

    COUNT(*DISTINCT*(event_id)) AS event_count -- note the DISTINCT keyword
here. Flink doesn't barf for this.

FROM (

    SELECT

        user_id,

        event_id,

        maxtimestamp(longToDateTime(rowtime)) as row_datetime

    FROM event_foo

    GROUP BY event_id, user_id

)

GROUP BY user_id, MONTH(row_datetime), YEAR(row_datetime)

On Wed, May 29, 2019 at 5:15 PM Vinod Mehra <vme...@lyft.com> wrote:

> More details on the error with query#1 that used COUNT(DISTINCT()):
>
> org.apache.flink.table.api.TableException: Cannot generate a valid
> execution plan for the given query:
>
> FlinkLogicalCalc(expr#0..8=[{inputs}], expr#9=[_UTF-16LE'-'],
> expr#10=[CAST($t1):VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE
> "ISO-8859-1$en_US$primary"], expr#11=[CAST($t2):VARCHAR(65536) CHARACTER
> SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary"],
> expr#12=[CAST($t0):VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE
> "ISO-8859-1$en_US$primary"], expr#13=[CONCAT_WS($t9, $t10, $t11, $t12)],
> EXPR$0=[$t13], mastercard_world_elite_monthly_rides_encoded=[$t8],
> lower_boundary=[$t3], latency_marker=[$t4])
>   FlinkLogicalJoin(condition=[AND(IS NOT DISTINCT FROM($0, $5), IS NOT
> DISTINCT FROM($1, $6), IS NOT DISTINCT FROM($2, $7))], joinType=[inner])
>     FlinkLogicalAggregate(group=[{0, 1, 2}],
> lower_boundary=[mintimestamp($4)], latency_marker=[maxtimestamp($4)])
>       FlinkLogicalCalc(expr#0..6=[{inputs}], expr#7=[FLAG(MONTH)],
> expr#8=[CAST($t6):TIMESTAMP(3) NOT NULL], expr#9=[longToDateTime($t8)],
> expr#10=[Reinterpret($t9)], expr#11=[86400000], expr#12=[/INT($t10, $t11)],
> expr#13=[EXTRACT_DATE($t7, $t12)], expr#14=[FLAG(YEAR)],
> expr#15=[EXTRACT_DATE($t14, $t12)], expr#16=[_UTF-16LE'USD'],
> expr#17=[=($t4, $t16)], expr#18=[_UTF-16LE'MWE'], expr#19=[=($t2, $t18)],
> expr#20=[NOT($t5)], expr#21=[AND($t17, $t19, $t20)], user_lyft_id=[$t3],
> $f1=[$t13], $f2=[$t15], ride_id=[$t1], $f4=[$t9], $condition=[$t21])
>
> FlinkLogicalNativeTableScan(table=[[event_pay_passenger_payment_succeeded_for_rewards]])
>     FlinkLogicalAggregate(group=[{0, 1, 2}],
> mastercard_world_elite_monthly_rides_encoded=[COUNT($3)])
>       FlinkLogicalAggregate(group=[{0, 1, 2, 3}])
>         FlinkLogicalCalc(expr#0..6=[{inputs}], expr#7=[FLAG(MONTH)],
> expr#8=[CAST($t6):TIMESTAMP(3) NOT NULL], expr#9=[longToDateTime($t8)],
> expr#10=[Reinterpret($t9)], expr#11=[86400000], expr#12=[/INT($t10, $t11)],
> expr#13=[EXTRACT_DATE($t7, $t12)], expr#14=[FLAG(YEAR)],
> expr#15=[EXTRACT_DATE($t14, $t12)], expr#16=[_UTF-16LE'USD'],
> expr#17=[=($t4, $t16)], expr#18=[_UTF-16LE'MWE'], expr#19=[=($t2, $t18)],
> expr#20=[NOT($t5)], expr#21=[AND($t17, $t19, $t20)], user_lyft_id=[$t3],
> $f1=[$t13], $f2=[$t15], ride_id=[$t1], $condition=[$t21])
>
> FlinkLogicalNativeTableScan(table=[[event_pay_passenger_payment_succeeded_for_rewards]])
>
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL
> features.
>
> at
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:274)
> at
> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:683)
> at
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:730)
> at
> org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:414)
>
>
>
> On Wed, May 29, 2019 at 1:49 PM Vinod Mehra <vme...@lyft.com> wrote:
>
>> Hi!
>>
>> We are using apache-flink-1.4.2. It seems this version doesn't support
>> count(DISTINCT). I am trying to find a way to dedup the stream. So I tried:
>>
>> SELECT
>>
>>     CONCAT_WS(
>>
>>       '-',
>>
>>       CAST(MONTH(longToDateTime(rowtime)) AS VARCHAR),
>>
>>       CAST(YEAR(longToDateTime(rowtime)) AS VARCHAR),
>>
>>       CAST(user_id AS VARCHAR)
>>
>>     ),
>>
>>     COUNT(*DISTINCT*(event_id)) AS event_count
>>
>> FROM event_foo
>>
>> GROUP BY user_id, MONTH(longToDateTime(rowtime)),
>> YEAR(longToDateTime(rowtime))
>>
>>
>> (the duplicate events have the same 'event_id' (and user_id), the other
>> fields e.g. timestamps may or may not be different)
>>
>>
>> But that failed because DISTINCT is not supported. As a workaround I
>> tried:
>>
>> SELECT
>>
>>     CONCAT_WS(
>>
>>       '-',
>>
>>       CAST(MONTH(row_datetime) AS VARCHAR),
>>
>>       CAST(YEAR(row_datetime) AS VARCHAR),
>>
>>       CAST(user_id AS VARCHAR)
>>
>>     ),
>>
>>     COUNT(event_id) AS event_count
>>
>> FROM (
>>
>>     SELECT
>>
>>         user_id,
>>
>>         event_id,
>>
>>         maxtimestamp(longToDateTime(rowtime)) as row_datetime
>>
>>     FROM event_foo
>>
>>     GROUP BY event_id, user_id
>>
>> )
>>
>> GROUP BY user_id, MONTH(row_datetime), YEAR(row_datetime)
>>
>> I am hoping the inner SELECT to do the deduping because logically it is
>> equivalent to a DISTINCT. This works in my functional testing.
>>
>> Will it also work if the dedups span different event buckets? I was
>> hoping that as long as the events arrive within the state "retention time"
>> in flink they should be deduped but I am new to Flink so I am not sure
>> about that. Can someone please correct me if I am wrong? Is this a
>> reasonable workaround for lack of DISTINCT support? Please let me know if
>> there is a better way.
>>
>> Thanks,
>> Vinod
>>
>>

Reply via email to