Hi Miki,

Sorry for the late response.
There are basically two ways to implement an enrichment join as in your use
case.

1) Keep the meta data in the database and implement a job that reads the
stream from Kafka and queries the database in an ASyncIO operator for every
stream record. This should be the easier implementation but it will send
one query to the DB for each streamed record.
2) Replicate the meta data into Flink state and join the streamed records
with the state. This solution is more complex because you need propagate
updates of the meta data (if there are any) into the Flink state. At the
moment, Flink lacks a few features to have a good implementation of this
approach, but there a some workarounds that help in certain cases.

Note that Flink's SQL support does not add advantages for the either of
both approaches. You should use the DataStream API (and possible
ProcessFunctions).

I'd go for the first approach if one query per record is feasible.
Let me know if you need to tackle the second approach and I can give some
details on the workarounds I mentioned.

Best, Fabian

2018-04-16 20:38 GMT+02:00 Ken Krugler <kkrugler_li...@transpac.com>:

> Hi Miki,
>
> I haven’t tried mixing AsyncFunctions with SQL queries.
>
> Normally I’d create a regular DataStream workflow that first reads from
> Kafka, then has an AsyncFunction to read from the SQL database.
>
> If there are often duplicate keys in the Kafka-based stream, you could
> keyBy(key) before the AsyncFunction, and then cache the result of the SQL
> query.
>
> — Ken
>
> On Apr 16, 2018, at 11:19 AM, miki haiat <miko5...@gmail.com> wrote:
>
> HI thanks  for the reply  i will try to break your reply to the flow
> execution order .
>
> First data stream Will use AsyncIO and select the table ,
> Second stream will be kafka and the i can join the stream and map it ?
>
> If that   the case  then i will select the table only once on load ?
> How can i make sure that my stream table is "fresh" .
>
> Im thinking to myself , is thire a way to use flink backend (ROKSDB)  and
> create read/write through
> macanisem ?
>
> Thanks
>
> miki
>
>
>
> On Mon, Apr 16, 2018 at 2:45 AM, Ken Krugler <kkrugler_li...@transpac.com>
> wrote:
>
>> If the SQL data is all (or mostly all) needed to join against the data
>> from Kafka, then I might try a regular join.
>>
>> Otherwise it sounds like you want to use an AsyncFunction to do ad hoc
>> queries (in parallel) against your SQL DB.
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>> dev/stream/operators/asyncio.html
>>
>> — Ken
>>
>>
>> On Apr 15, 2018, at 12:15 PM, miki haiat <miko5...@gmail.com> wrote:
>>
>> Hi,
>>
>> I have a case of meta data enrichment and im wondering if my approach is
>> the correct way .
>>
>>    1. input stream from kafka.
>>    2. MD in msSQL .
>>    3. map to new pojo
>>
>> I need to extract  a key from the kafka stream   and use it to select
>> some values from the sql table  .
>>
>> SO i thought  to use  the table SQL api in order to select the table MD
>> then convert the kafka stream to table and join the data by  the stream
>> key .
>>
>> At the end i need to map the joined data to a new POJO and send it to
>> elesticserch .
>>
>> Any suggestions or different ways to solve this use case ?
>>
>> thanks,
>> Miki
>>
>>
>>
>>
>> --------------------------
>> Ken Krugler
>> http://www.scaleunlimited.com
>> custom big data solutions & training
>> Hadoop, Cascading, Cassandra & Solr
>>
>>
>
> --------------------------------------------
> http://about.me/kkrugler
> +1 530-210-6378
>
>

Reply via email to