Hi Vinod,

IIRC, support for DISTINCT aggregates was added in Flink 1.6.0 (released
August, 9th 2018) [1].
Also note that by default, this query will accumulate more and more state,
i.e., for each grouping key it will hold all unique event_ids.
You could configure an idle state retention time to clean up unused state.

Regarding the boundaries, with the current query they are fixed to one
month and sharply cut (as one would expect).
You could try to use a long running session window [3]. This would also
remove the need for the idle state configuration because Flink would know
when state can be discarded.

Hope this helps,
Fabian

[1]
https://flink.apache.org/news/2018/08/09/release-1.6.0.html#enhancing-sql-and-table-api
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/query_configuration.html#idle-state-retention-time
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sql.html#group-windows

Am Do., 30. Mai 2019 um 02:18 Uhr schrieb Vinod Mehra <vme...@lyft.com>:

> Another interesting thing is that if I add DISTINCT in the 2nd query it
> doesn't complain. But because of the inner-select it is a no-op because the
> inner select is doing the deduping:
>
> SELECT
>
>     CONCAT_WS(
>
>       '-',
>
>       CAST(MONTH(row_datetime) AS VARCHAR),
>
>       CAST(YEAR(row_datetime) AS VARCHAR),
>
>       CAST(user_id AS VARCHAR)
>
>     ),
>
>     COUNT(*DISTINCT*(event_id)) AS event_count -- note the DISTINCT
> keyword here. Flink doesn't barf for this.
>
> FROM (
>
>     SELECT
>
>         user_id,
>
>         event_id,
>
>         maxtimestamp(longToDateTime(rowtime)) as row_datetime
>
>     FROM event_foo
>
>     GROUP BY event_id, user_id
>
> )
>
> GROUP BY user_id, MONTH(row_datetime), YEAR(row_datetime)
>
> On Wed, May 29, 2019 at 5:15 PM Vinod Mehra <vme...@lyft.com> wrote:
>
>> More details on the error with query#1 that used COUNT(DISTINCT()):
>>
>> org.apache.flink.table.api.TableException: Cannot generate a valid
>> execution plan for the given query:
>>
>> FlinkLogicalCalc(expr#0..8=[{inputs}], expr#9=[_UTF-16LE'-'],
>> expr#10=[CAST($t1):VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE
>> "ISO-8859-1$en_US$primary"], expr#11=[CAST($t2):VARCHAR(65536) CHARACTER
>> SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary"],
>> expr#12=[CAST($t0):VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE
>> "ISO-8859-1$en_US$primary"], expr#13=[CONCAT_WS($t9, $t10, $t11, $t12)],
>> EXPR$0=[$t13], mastercard_world_elite_monthly_rides_encoded=[$t8],
>> lower_boundary=[$t3], latency_marker=[$t4])
>>   FlinkLogicalJoin(condition=[AND(IS NOT DISTINCT FROM($0, $5), IS NOT
>> DISTINCT FROM($1, $6), IS NOT DISTINCT FROM($2, $7))], joinType=[inner])
>>     FlinkLogicalAggregate(group=[{0, 1, 2}],
>> lower_boundary=[mintimestamp($4)], latency_marker=[maxtimestamp($4)])
>>       FlinkLogicalCalc(expr#0..6=[{inputs}], expr#7=[FLAG(MONTH)],
>> expr#8=[CAST($t6):TIMESTAMP(3) NOT NULL], expr#9=[longToDateTime($t8)],
>> expr#10=[Reinterpret($t9)], expr#11=[86400000], expr#12=[/INT($t10, $t11)],
>> expr#13=[EXTRACT_DATE($t7, $t12)], expr#14=[FLAG(YEAR)],
>> expr#15=[EXTRACT_DATE($t14, $t12)], expr#16=[_UTF-16LE'USD'],
>> expr#17=[=($t4, $t16)], expr#18=[_UTF-16LE'MWE'], expr#19=[=($t2, $t18)],
>> expr#20=[NOT($t5)], expr#21=[AND($t17, $t19, $t20)], user_lyft_id=[$t3],
>> $f1=[$t13], $f2=[$t15], ride_id=[$t1], $f4=[$t9], $condition=[$t21])
>>
>> FlinkLogicalNativeTableScan(table=[[event_pay_passenger_payment_succeeded_for_rewards]])
>>     FlinkLogicalAggregate(group=[{0, 1, 2}],
>> mastercard_world_elite_monthly_rides_encoded=[COUNT($3)])
>>       FlinkLogicalAggregate(group=[{0, 1, 2, 3}])
>>         FlinkLogicalCalc(expr#0..6=[{inputs}], expr#7=[FLAG(MONTH)],
>> expr#8=[CAST($t6):TIMESTAMP(3) NOT NULL], expr#9=[longToDateTime($t8)],
>> expr#10=[Reinterpret($t9)], expr#11=[86400000], expr#12=[/INT($t10, $t11)],
>> expr#13=[EXTRACT_DATE($t7, $t12)], expr#14=[FLAG(YEAR)],
>> expr#15=[EXTRACT_DATE($t14, $t12)], expr#16=[_UTF-16LE'USD'],
>> expr#17=[=($t4, $t16)], expr#18=[_UTF-16LE'MWE'], expr#19=[=($t2, $t18)],
>> expr#20=[NOT($t5)], expr#21=[AND($t17, $t19, $t20)], user_lyft_id=[$t3],
>> $f1=[$t13], $f2=[$t15], ride_id=[$t1], $condition=[$t21])
>>
>> FlinkLogicalNativeTableScan(table=[[event_pay_passenger_payment_succeeded_for_rewards]])
>>
>> This exception indicates that the query uses an unsupported SQL feature.
>> Please check the documentation for the set of currently supported SQL
>> features.
>>
>> at
>> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:274)
>> at
>> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:683)
>> at
>> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:730)
>> at
>> org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:414)
>>
>>
>>
>> On Wed, May 29, 2019 at 1:49 PM Vinod Mehra <vme...@lyft.com> wrote:
>>
>>> Hi!
>>>
>>> We are using apache-flink-1.4.2. It seems this version doesn't support
>>> count(DISTINCT). I am trying to find a way to dedup the stream. So I tried:
>>>
>>> SELECT
>>>
>>>     CONCAT_WS(
>>>
>>>       '-',
>>>
>>>       CAST(MONTH(longToDateTime(rowtime)) AS VARCHAR),
>>>
>>>       CAST(YEAR(longToDateTime(rowtime)) AS VARCHAR),
>>>
>>>       CAST(user_id AS VARCHAR)
>>>
>>>     ),
>>>
>>>     COUNT(*DISTINCT*(event_id)) AS event_count
>>>
>>> FROM event_foo
>>>
>>> GROUP BY user_id, MONTH(longToDateTime(rowtime)),
>>> YEAR(longToDateTime(rowtime))
>>>
>>>
>>> (the duplicate events have the same 'event_id' (and user_id), the other
>>> fields e.g. timestamps may or may not be different)
>>>
>>>
>>> But that failed because DISTINCT is not supported. As a workaround I
>>> tried:
>>>
>>> SELECT
>>>
>>>     CONCAT_WS(
>>>
>>>       '-',
>>>
>>>       CAST(MONTH(row_datetime) AS VARCHAR),
>>>
>>>       CAST(YEAR(row_datetime) AS VARCHAR),
>>>
>>>       CAST(user_id AS VARCHAR)
>>>
>>>     ),
>>>
>>>     COUNT(event_id) AS event_count
>>>
>>> FROM (
>>>
>>>     SELECT
>>>
>>>         user_id,
>>>
>>>         event_id,
>>>
>>>         maxtimestamp(longToDateTime(rowtime)) as row_datetime
>>>
>>>     FROM event_foo
>>>
>>>     GROUP BY event_id, user_id
>>>
>>> )
>>>
>>> GROUP BY user_id, MONTH(row_datetime), YEAR(row_datetime)
>>>
>>> I am hoping the inner SELECT to do the deduping because logically it is
>>> equivalent to a DISTINCT. This works in my functional testing.
>>>
>>> Will it also work if the dedups span different event buckets? I was
>>> hoping that as long as the events arrive within the state "retention time"
>>> in flink they should be deduped but I am new to Flink so I am not sure
>>> about that. Can someone please correct me if I am wrong? Is this a
>>> reasonable workaround for lack of DISTINCT support? Please let me know if
>>> there is a better way.
>>>
>>> Thanks,
>>> Vinod
>>>
>>>

Reply via email to