To be clear I want the outer grouping to have a longer retention time (of
the order of week or month - for which we are using 'idle state retention
time') and inner grouping to have a shorter retention period (1 hour max).
So hoping the session window will do the right thing.

Thanks,
Vinod

On Tue, Jun 4, 2019 at 5:14 PM Vinod Mehra <vme...@lyft.com> wrote:

> Thanks a lot Fabian for the detailed response. I know all the duplicates
> are going to arrive within an hour max of the actual event. So using a 1
> hour running session window should be fine for me.
>
> Is the following the right way to do it in apache-flink-1.4.2?
>
> SELECT
>
>     CONCAT_WS(
>
>       '-',
>
>       CAST(event_month AS VARCHAR),
>
>       CAST(event_year AS VARCHAR),
>
>       CAST(user_id AS VARCHAR)
>
>     ),
>
>     COUNT(event_id) AS event_count
>
> FROM (
>
>     SELECT
>
>         user_id,
>
>         event_id,
>
>         MAX(MONTH(longToDateTime(rowtime))) as event_month,
>
>         MAX(YEAR(longToDateTime(rowtime))) as event_year,
>
>     FROM event_foo
>
>     GROUP BY event_id, user_id, SESSION(rowtime, INTERVAL '1' HOUR) -- 1
> hour running session window
>
> )
>
> GROUP BY user_id, event_month, event_year
>
>
>
> We are also using idle state retention time to clean up unused state, but
> that is much longer (a week or month depending on the usecase). We will
> switch to count(DISTINCT) as soon as we move to newer Flink version. So the
> above nested select is going to be a stop gap until then.
>
> Thanks,
> Vinod
>
> On Mon, Jun 3, 2019 at 4:52 AM Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi Vinod,
>>
>> IIRC, support for DISTINCT aggregates was added in Flink 1.6.0 (released
>> August, 9th 2018) [1].
>> Also note that by default, this query will accumulate more and more
>> state, i.e., for each grouping key it will hold all unique event_ids.
>> You could configure an idle state retention time to clean up unused state.
>>
>> Regarding the boundaries, with the current query they are fixed to one
>> month and sharply cut (as one would expect).
>> You could try to use a long running session window [3]. This would also
>> remove the need for the idle state configuration because Flink would know
>> when state can be discarded.
>>
>> Hope this helps,
>> Fabian
>>
>> [1]
>> https://flink.apache.org/news/2018/08/09/release-1.6.0.html#enhancing-sql-and-table-api
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/query_configuration.html#idle-state-retention-time
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sql.html#group-windows
>>
>> Am Do., 30. Mai 2019 um 02:18 Uhr schrieb Vinod Mehra <vme...@lyft.com>:
>>
>>> Another interesting thing is that if I add DISTINCT in the 2nd query it
>>> doesn't complain. But because of the inner-select it is a no-op because the
>>> inner select is doing the deduping:
>>>
>>> SELECT
>>>
>>>     CONCAT_WS(
>>>
>>>       '-',
>>>
>>>       CAST(MONTH(row_datetime) AS VARCHAR),
>>>
>>>       CAST(YEAR(row_datetime) AS VARCHAR),
>>>
>>>       CAST(user_id AS VARCHAR)
>>>
>>>     ),
>>>
>>>     COUNT(*DISTINCT*(event_id)) AS event_count -- note the DISTINCT
>>> keyword here. Flink doesn't barf for this.
>>>
>>> FROM (
>>>
>>>     SELECT
>>>
>>>         user_id,
>>>
>>>         event_id,
>>>
>>>         maxtimestamp(longToDateTime(rowtime)) as row_datetime
>>>
>>>     FROM event_foo
>>>
>>>     GROUP BY event_id, user_id
>>>
>>> )
>>>
>>> GROUP BY user_id, MONTH(row_datetime), YEAR(row_datetime)
>>>
>>> On Wed, May 29, 2019 at 5:15 PM Vinod Mehra <vme...@lyft.com> wrote:
>>>
>>>> More details on the error with query#1 that used COUNT(DISTINCT()):
>>>>
>>>> org.apache.flink.table.api.TableException: Cannot generate a valid
>>>> execution plan for the given query:
>>>>
>>>> FlinkLogicalCalc(expr#0..8=[{inputs}], expr#9=[_UTF-16LE'-'],
>>>> expr#10=[CAST($t1):VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE
>>>> "ISO-8859-1$en_US$primary"], expr#11=[CAST($t2):VARCHAR(65536) CHARACTER
>>>> SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary"],
>>>> expr#12=[CAST($t0):VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE
>>>> "ISO-8859-1$en_US$primary"], expr#13=[CONCAT_WS($t9, $t10, $t11, $t12)],
>>>> EXPR$0=[$t13], mastercard_world_elite_monthly_rides_encoded=[$t8],
>>>> lower_boundary=[$t3], latency_marker=[$t4])
>>>>   FlinkLogicalJoin(condition=[AND(IS NOT DISTINCT FROM($0, $5), IS NOT
>>>> DISTINCT FROM($1, $6), IS NOT DISTINCT FROM($2, $7))], joinType=[inner])
>>>>     FlinkLogicalAggregate(group=[{0, 1, 2}],
>>>> lower_boundary=[mintimestamp($4)], latency_marker=[maxtimestamp($4)])
>>>>       FlinkLogicalCalc(expr#0..6=[{inputs}], expr#7=[FLAG(MONTH)],
>>>> expr#8=[CAST($t6):TIMESTAMP(3) NOT NULL], expr#9=[longToDateTime($t8)],
>>>> expr#10=[Reinterpret($t9)], expr#11=[86400000], expr#12=[/INT($t10, $t11)],
>>>> expr#13=[EXTRACT_DATE($t7, $t12)], expr#14=[FLAG(YEAR)],
>>>> expr#15=[EXTRACT_DATE($t14, $t12)], expr#16=[_UTF-16LE'USD'],
>>>> expr#17=[=($t4, $t16)], expr#18=[_UTF-16LE'MWE'], expr#19=[=($t2, $t18)],
>>>> expr#20=[NOT($t5)], expr#21=[AND($t17, $t19, $t20)], user_lyft_id=[$t3],
>>>> $f1=[$t13], $f2=[$t15], ride_id=[$t1], $f4=[$t9], $condition=[$t21])
>>>>
>>>> FlinkLogicalNativeTableScan(table=[[event_pay_passenger_payment_succeeded_for_rewards]])
>>>>     FlinkLogicalAggregate(group=[{0, 1, 2}],
>>>> mastercard_world_elite_monthly_rides_encoded=[COUNT($3)])
>>>>       FlinkLogicalAggregate(group=[{0, 1, 2, 3}])
>>>>         FlinkLogicalCalc(expr#0..6=[{inputs}], expr#7=[FLAG(MONTH)],
>>>> expr#8=[CAST($t6):TIMESTAMP(3) NOT NULL], expr#9=[longToDateTime($t8)],
>>>> expr#10=[Reinterpret($t9)], expr#11=[86400000], expr#12=[/INT($t10, $t11)],
>>>> expr#13=[EXTRACT_DATE($t7, $t12)], expr#14=[FLAG(YEAR)],
>>>> expr#15=[EXTRACT_DATE($t14, $t12)], expr#16=[_UTF-16LE'USD'],
>>>> expr#17=[=($t4, $t16)], expr#18=[_UTF-16LE'MWE'], expr#19=[=($t2, $t18)],
>>>> expr#20=[NOT($t5)], expr#21=[AND($t17, $t19, $t20)], user_lyft_id=[$t3],
>>>> $f1=[$t13], $f2=[$t15], ride_id=[$t1], $condition=[$t21])
>>>>
>>>> FlinkLogicalNativeTableScan(table=[[event_pay_passenger_payment_succeeded_for_rewards]])
>>>>
>>>> This exception indicates that the query uses an unsupported SQL feature.
>>>> Please check the documentation for the set of currently supported SQL
>>>> features.
>>>>
>>>> at
>>>> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:274)
>>>> at
>>>> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:683)
>>>> at
>>>> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:730)
>>>> at
>>>> org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:414)
>>>>
>>>>
>>>>
>>>> On Wed, May 29, 2019 at 1:49 PM Vinod Mehra <vme...@lyft.com> wrote:
>>>>
>>>>> Hi!
>>>>>
>>>>> We are using apache-flink-1.4.2. It seems this version doesn't support
>>>>> count(DISTINCT). I am trying to find a way to dedup the stream. So I 
>>>>> tried:
>>>>>
>>>>> SELECT
>>>>>
>>>>>     CONCAT_WS(
>>>>>
>>>>>       '-',
>>>>>
>>>>>       CAST(MONTH(longToDateTime(rowtime)) AS VARCHAR),
>>>>>
>>>>>       CAST(YEAR(longToDateTime(rowtime)) AS VARCHAR),
>>>>>
>>>>>       CAST(user_id AS VARCHAR)
>>>>>
>>>>>     ),
>>>>>
>>>>>     COUNT(*DISTINCT*(event_id)) AS event_count
>>>>>
>>>>> FROM event_foo
>>>>>
>>>>> GROUP BY user_id, MONTH(longToDateTime(rowtime)),
>>>>> YEAR(longToDateTime(rowtime))
>>>>>
>>>>>
>>>>> (the duplicate events have the same 'event_id' (and user_id), the
>>>>> other fields e.g. timestamps may or may not be different)
>>>>>
>>>>>
>>>>> But that failed because DISTINCT is not supported. As a workaround I
>>>>> tried:
>>>>>
>>>>> SELECT
>>>>>
>>>>>     CONCAT_WS(
>>>>>
>>>>>       '-',
>>>>>
>>>>>       CAST(MONTH(row_datetime) AS VARCHAR),
>>>>>
>>>>>       CAST(YEAR(row_datetime) AS VARCHAR),
>>>>>
>>>>>       CAST(user_id AS VARCHAR)
>>>>>
>>>>>     ),
>>>>>
>>>>>     COUNT(event_id) AS event_count
>>>>>
>>>>> FROM (
>>>>>
>>>>>     SELECT
>>>>>
>>>>>         user_id,
>>>>>
>>>>>         event_id,
>>>>>
>>>>>         maxtimestamp(longToDateTime(rowtime)) as row_datetime
>>>>>
>>>>>     FROM event_foo
>>>>>
>>>>>     GROUP BY event_id, user_id
>>>>>
>>>>> )
>>>>>
>>>>> GROUP BY user_id, MONTH(row_datetime), YEAR(row_datetime)
>>>>>
>>>>> I am hoping the inner SELECT to do the deduping because logically it
>>>>> is equivalent to a DISTINCT. This works in my functional testing.
>>>>>
>>>>> Will it also work if the dedups span different event buckets? I was
>>>>> hoping that as long as the events arrive within the state "retention time"
>>>>> in flink they should be deduped but I am new to Flink so I am not sure
>>>>> about that. Can someone please correct me if I am wrong? Is this a
>>>>> reasonable workaround for lack of DISTINCT support? Please let me know if
>>>>> there is a better way.
>>>>>
>>>>> Thanks,
>>>>> Vinod
>>>>>
>>>>>

Reply via email to