Thanks for the hint! For some reason it isn't catching all distinct values
(even though it's a much simpler way than what I initially tried and seems
good in that sense). First of all, isn't this like a sliding window:
"rowtime RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW"?

My use case needs a tumbling window. I tried adding PARTITION BY
additionally with DATE_FORMAT(rowtime, '%Y%m%d%H') to achieve the same
result as with a tumbling window; this resulted in slightly more distinct
values, but was still missing some! Would there by some nice way to create
a tumbling window right in the RANGE condition instead?

As a disclaimer I have to say we seem to be fine using a simple window
_without_ any early triggering. But of course it would be nice to
understand how early triggering could be enabled in a simple & scalable way.

Cheers,
Juho

On Mon, Feb 19, 2018 at 1:44 PM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Juho,
>
> sorry for the late response. I found time to look into this issue.
> I agree, that the start and end timestamps of the HOP window should be 1
> hour apart from each other. I tried to reproduce the issue, but was not
> able to do so.
> Can you maybe open a JIRA and provide a simple test case (collection data
> source, no Kafka) that reproduces the issue?
>
> Regarding the task that you are trying to solve, have you looked into OVER
> windows?
>
> The following query would count for each record, how often a record with
> the same ID combination was observed in the last hour based on its
> timestamp:
>
> SELECT
>   s_aid1,
>   s_cid,
>   COUNT(*) OVER (PARTITION BY s_aid1, s_cid ORDER BY rowtime RANGE BETWEEN
> INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) AS occurrence,
>   rowtime
> FROM events
> WHERE s_aid1 IS NOT NULL
>
> If occurrence is 1, the current record is the only record within the last
> 1 hour with the combination of aid and cid .
> The query does not batch the stream by 10 seconds, but rather produces the
> results in real-time. If the batching is not required, you should be good
> by adding a filter on occurrence = 1.
> Otherwise, you could add the filter and wrap it by 10 secs tumbling window.
>
> Hope this helps,
> Fabian
>
>
> 2018-02-14 15:30 GMT+01:00 Juho Autio <juho.au...@rovio.com>:
>
>> I'm joining a tumbling & hopping window in Flink 1.5-SNAPSHOT. The result
>> is unexpected. Am I doing something wrong? Maybe this is just not a
>> supported join type at all? Any way here goes:
>>
>> I first register these two tables:
>>
>> 1. new_ids: a tumbling window of seen ids within the last 10 seconds:
>>
>> SELECT
>>   s_aid1,
>>   s_cid,
>>   TS_MIN(rowtime) AS first_seen,
>>   CAST(DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '10' SECOND),
>> '%Y%m%d/%H/%i/%S') AS VARCHAR) AS processdate,
>>   TUMBLE_START(rowtime, INTERVAL '10' SECOND) AS tumble_start,
>>   TUMBLE_END(rowtime, INTERVAL '10' SECOND) AS tumble_end
>> FROM events
>> WHERE s_aid1 IS NOT NULL
>> GROUP BY
>>   s_aid1,
>>   s_cid,
>>   TUMBLE(rowtime, INTERVAL '10' SECOND)
>>
>> 2. seen_ids: a sliding window of seen ids 1 hour backwards, 10 second hop:
>>
>> SELECT
>>   s_aid1,
>>   s_cid,
>>   TS_MIN(rowtime) AS first_seen,
>>   CAST(HOP_START(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) AS
>> DATE) AS processdate,
>>   HOP_START(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) AS
>> HOP_start,
>>   HOP_END(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) AS HOP_end
>> FROM events
>> WHERE s_aid1 IS NOT NULL
>> GROUP BY
>>   s_aid1,
>>   s_cid,
>>   HOP(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR)
>>
>> If I write the results of the "seen_ids" table, the difference between
>> HOP_start and HOP_end is always 1 hour, as expected.
>>
>> Then I register another query that joins the 2 tables:
>>
>> unique_ids (mostly including fields for debugging - what I need is the
>> unique, new combinations of s_cid x s_aid1):
>>
>> SELECT
>>    new_ids.s_cid,
>>    new_ids.s_aid1,
>>    new_ids.processdate AS processdate,
>>    seen_ids.processdate AS seen_ids_processdate,
>>    new_ids.first_seen AS new_ids_first_seen,
>>    seen_ids.first_seen AS seen_ids_first_seen,
>>    tumble_start,
>>    HOP_start,
>>    tumble_end,
>>    HOP_end
>> FROM new_ids, seen_ids
>> WHERE new_ids.s_cid = seen_ids.s_cid
>>   AND new_ids.s_aid1 = seen_ids.s_aid1
>>   AND (new_ids.first_seen <= seen_ids.first_seen OR seen_ids.first_seen
>> IS NULL)
>>
>> I print the results of this table, and surprisingly the HOP_start &
>> HOP_end are only separated by 10 seconds. Is this a bug?
>>
>> {
>>   "s_cid": "appsimulator_236e5fb7",
>> "s_aid1": "L1GENe52d723b-b563-492f-942d-3dc1a31d7e26",
>>
>> "seen_ids_processdate": "2018-02-14",
>>
>> "seen_ids_first_seen": "2018-02-14 11:37:59.0",
>> "new_ids_first_seen":  "2018-02-14 11:34:33.0",
>> "tumble_start": "2018-02-14 11:34:30.0",
>> "tumble_end": "2018-02-14 11:34:40.0",
>>
>> "HOP_start": "2018-02-14 11:37:50.0",
>> "HOP_end": "2018-02-14 11:38:00.0"
>> }
>>
>> What I'm trying to do is exclude the id from the current "new_ids" window
>> if it was already seen before (within the 1 hour scope of "seen_ids"), but
>> that doesn't work either. This example result row also shows that
>> "seen_ids.first_seen" is bigger than it should be.
>>
>>
>> Even if I can find a fix to this to get what I need, this strategy seems
>> overly complicated. If anyone can suggest a better way, I'd be glad to
>> hear. If this was a batch job, it could be defined simply as:
>>
>> SELECT DISTINCT s_cid, s_aid1, DATE_FORMAT(rowtime, '%Y%m%d/%H')
>>
>> + when streaming this query, the new distinct values should be written
>> out every 10 seconds (ONLY the new ones - within that wrapping 1 hour
>> window). So far I haven't been able to figure out how to do that in a
>> simple way with Flink.
>>
>>
>> *) TS_MIN is a custom function, but it's just a mapping of Flink's
>> MinAggFunction:
>>
>> import java.sql.Timestamp
>>
>> import com.rovio.ds.flink.common.udaf.ImplicitOrdering.ordered
>>
>> import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
>> import org.apache.flink.table.functions.aggfunctions.MaxAggFunction
>> import org.apache.flink.table.functions.aggfunctions.MinAggFunction
>>
>> object TimestampAggFunctions {
>>
>>   trait TimestampAggFunction {
>>     def getInitValue = null
>>     def getValueTypeInfo = SqlTimeTypeInfo.TIMESTAMP
>>   }
>>
>>   class TimestampMinAggFunction extends MinAggFunction[Timestamp] with
>> TimestampAggFunction
>>   class TimestampMaxAggFunction extends MaxAggFunction[Timestamp] with
>> TimestampAggFunction
>>
>> }
>>
>> // Registered with:
>> tableEnv.registerFunction("TS_MIN", new TimestampMinAggFunction());
>>
>>
>

Reply via email to