Very interesting, Kurt! Yes, I also imagined it's rather a very common
case. In my company we currently have 3 clients wanting this functionality.
I also just realized this slight difference between Temporal Join and
Temporal Table Function Join, that there are actually two methods:)

Regarding option 1:
So I would need to:
* write a Datastream API source, that pulls Hive dictionary table every
let's say day, assigns event time column to rows and creates a stream of
it. It does that and only that.
* create a table (from Table API) out of it, assigning one of the columns
as an event time column.
* then use table.createTemporalTableFunction(<all columns, including time
column>)
* finally join my main data stream with the temporal table function (let me
use short name TTF from now) from my dictionary, using Flink SQL and LATERAL
TABLE (Rates(o.rowtime)) AS r construct.
And so I should achieve my temporal event-time based join with versioned
dictionaries!
Question 1: do I need to write that Hive source or can I use something
ready, like Hive catalog integration? Or maybe reuse e.g. HiveTableSource
class?

Question/worry 2: One thing that worried me is this comment in the docs:

*Note: State retention defined in a query configuration
<https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html>
is
not yet implemented for temporal joins. This means that the required state
to compute the query result might grow infinitely depending on the number
of distinct primary keys for the history table.  *

On the other side, I find this comment: *By definition of event
time, watermarks
<https://ci.apache.org/projects/flink/flink-docs-master/dev/event_time.html>
allow
the join operation to move forward in time and discard versions of the
build table that are no longer necessary because no incoming row with lower
or equal timestamp is expected.*
So I believe that the state would grow inifinitely if I had infinite number
of keys, but not only infinite number of versions of all keys. Which is
fine. Do you confirm?

Question 3: I need to be able to cover also reprocessing or backfilling of
historical data. Let's say I would need to join data stream and
(versioned/snapshotted) dictionaries stored on HDFS. Do you imagine that I
could use the same logic for both stream processing and reprocessing just
by replacing sources and sinks? Maybe after some slight modifications?


Regarding option 2:
Here I understand the current limitation (which will stay for some time )
is that the join can happen only on processing time, which means join only
with the latest version of dictionaries.
Accepting that, I understand I would need to do:
a) load Hive table to e.g. HBase and then use HBaseTableSource on it., OR
b) maybe implement Hive/JDBC-based LookupableTableSource that  pulls the
whole dictionary to memory (or even to Flink state, if it is possible to
use it from TableFunction).
Then use this table and my Kafka stream table in temporal join expressed
with Flink SQL.
What do you think, is that feasible?
Do I understand correctly, that this option is available only with Blink
engine and also only with use of Flink SQL, no Table API?

Same question comes up regarding reprocessing: do you think it would be
possible to use the same logic / SQL for reprocessing?

Thank you for continuing discussion with me. I believe we're here on a
subject of a really important design for the community.
Krzysztof

pt., 13 gru 2019 o 09:39 Kurt Young <ykt...@gmail.com> napisaƂ(a):

> Sorry I forgot to paste the reference url.
>
> Best,
> Kurt
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table-function
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
>
> On Fri, Dec 13, 2019 at 4:37 PM Kurt Young <ykt...@gmail.com> wrote:
>
>> Hi Krzysztof,
>>
>> What you raised also interested us a lot to achieve in Flink.
>> Unfortunately, there
>> is no in place solution in Table/SQL API yet, but you have 2 options
>> which are both
>> close to this thus need some modifications.
>>
>> 1. The first one is use temporal table function [1]. It needs you to
>> write the logic of
>> reading hive tables and do the daily update inside the table function.
>> 2. The second choice is to use temporal table join [2], which only works
>> with processing
>> time now (just like the simple solution you mentioned), and need the
>> table source has
>> look up capability (like hbase). Currently, hive connector doesn't
>> support look up, so to
>> make this work, you need to sync the content to other storages which
>> support look up,
>> like HBase.
>>
>> Both solutions are not ideal now, and we also aims to improve this maybe
>> in the following
>> release.
>>
>> Best,
>> Kurt
>>
>>
>> On Fri, Dec 13, 2019 at 1:44 AM Krzysztof Zarzycki <k.zarzy...@gmail.com>
>> wrote:
>>
>>> Hello dear Flinkers,
>>> If this kind of question was asked on the groups, I'm sorry for a
>>> duplicate. Feel free to just point me to the thread.
>>> I have to solve a probably pretty common case of joining a datastream to
>>> a dataset.
>>> Let's say I have the following setup:
>>> * I have a high pace stream of events coming in Kafka.
>>> * I have some dimension tables stored in Hive. These tables are changed
>>> daily. I can keep a snapshot for each day.
>>>
>>> Now conceptually, I would like to join the stream of incoming events to
>>> the dimension tables (simple hash join). we can consider two cases:
>>> 1) simpler, where I join the stream with the most recent version of the
>>> dictionaries. (So the result is accepted to be nondeterministic if the job
>>> is retried).
>>> 2) more advanced, where I would like to do temporal join of the stream
>>> with dictionaries snapshots that were valid at the time of the event. (This
>>> result should be deterministic).
>>>
>>> The end goal is to do aggregation of that joined stream, store results
>>> in Hive or more real-time analytical store (Druid).
>>>
>>> Now, could you please help me understand is any of these cases
>>> implementable with declarative Table/SQL API? With use of temporal joins,
>>> catalogs, Hive integration, JDBC connectors, or whatever beta features
>>> there are now. (I've read quite a lot of Flink docs about each of those,
>>> but I have a problem to compile this information in the final design.)
>>> Could you please help me understand how these components should
>>> cooperate?
>>> If that is impossible with Table API, can we come up with the easiest
>>> implementation using Datastream API ?
>>>
>>> Thanks a lot for any help!
>>> Krzysztof
>>>
>>

Reply via email to