unsubscribe

2020-07-29 Thread Maatary Okouya



Re: [External] Re: From Kafka Stream to Flink

2020-03-28 Thread Maatary Okouya
Hi all,

Just wondering what is the status at this point?

On Thu, Sep 19, 2019 at 4:38 PM Hequn Cheng  wrote:

> Hi,
>
> Fabian is totally right. Big thanks to the detailed answers and nice
> examples above.
>
> As for the PR, very sorry about the delay. It is mainly because of the
> merge of blink and my work switching to Flink Python recently.
> However, I think the later version of blink would cover this feature
> natively with further merges.
>
> Before that, I think we can use the solution Fabian provided above.
>
> There are some examples here[1][2] which may be helpful to you
> @Casado @Maatary.
> In [1], the test case quite matches your scenario(perform join after
> groupby+last_value). It also provides the udaf what you want and shows how
> to register it.
> In [2], the test shows how to use the built-in last_value in SQL. Note
> that the built-in last_value UDAF is only supported in blink-planner from
> flink-1.9.0. If you are using the flink-planner(or version before that),
> you can register the last_value UDAF with the TableEnvironment like it is
> showed in [1].
>
> Feel free to ask if there are other problems.
>
> Best, Hequn
> [1]
> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SubplanReuseTest.scala#L207
> [2]
> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala#L228
>
> On Thu, Sep 19, 2019 at 9:40 PM Casado Tejedor, Rubén <
> ruben.casado.teje...@accenture.com> wrote:
>
>> Thanks Fabian. @Hequn Cheng  Could you share the
>> status? Thanks for your amazing work!
>>
>>
>>
>> *De: *Fabian Hueske 
>> *Fecha: *viernes, 16 de agosto de 2019, 9:30
>> *Para: *"Casado Tejedor, Rubén" 
>> *CC: *Maatary Okouya , miki haiat <
>> miko5...@gmail.com>, user , Hequn Cheng <
>> chenghe...@gmail.com>
>> *Asunto: *Re: [External] Re: From Kafka Stream to Flink
>>
>>
>>
>> Hi Ruben,
>>
>>
>>
>> Work on this feature has already started [1], but stalled a bit (probably
>> due to the effort of merging the new Blink query processor).
>>
>> Hequn (in CC) is the guy working on upsert table ingestion, maybe he can
>> share what the status of this feature is.
>>
>>
>>
>> Best, Fabian
>>
>>
>>
>> [1] https://github.com/apache/flink/pull/6787
>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_flink_pull_6787=DwMFaQ=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU=brkRAgrW3LbdVDOiRLzI7SFUIWBL5aa2MIfENljA8xoe0lFg2u3-S6GnFTH7Pbmc=rosq06snxAUogg77Y2SAZnaQef16zpmhGPcoGNUd4vg=0Mc6IZBBxqaJ6S_possk4V4ZTpdNphlZ3NoNPeL6NGA=>
>>
>>
>>
>> Am Di., 13. Aug. 2019 um 11:05 Uhr schrieb Casado Tejedor, Rubén <
>> ruben.casado.teje...@accenture.com>:
>>
>> Hi
>>
>>
>>
>> Do you have an expected version of Flink to include the capability to
>> ingest an upsert stream as a dynamic table? We have such need in our
>> current project. What we have done is to emulate such behavior working at
>> low level with states (e.g. update existing value if key exists, create a
>> new value if key does not exist). But we cannot use SQL that would help as
>> to do it faster.
>>
>>
>>
>> Our use case is many small flink jobs that have to something like:
>>
>>
>>
>> SELECT *some fields*
>>
>> FROM *t1* INNER JOIN *t1 on t1.id
>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__t1.id=DwMFaQ=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU=brkRAgrW3LbdVDOiRLzI7SFUIWBL5aa2MIfENljA8xoe0lFg2u3-S6GnFTH7Pbmc=rosq06snxAUogg77Y2SAZnaQef16zpmhGPcoGNUd4vg=5ReK8KBJ2AMxI8faigLTfxwAxvlvXbtPG48TzkLZbXc=>
>> = t2.id
>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__t2.id=DwMFaQ=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU=brkRAgrW3LbdVDOiRLzI7SFUIWBL5aa2MIfENljA8xoe0lFg2u3-S6GnFTH7Pbmc=rosq06snxAUogg77Y2SAZnaQef16zpmhGPcoGNUd4vg=BnXyZjU0mHMrZ-gu7wRz5GUirxitCuQcCFjd8nbVNyw=>
>> (maybe join +3 tables)*
>>
>> WHERE *some conditions on fields*;
>>
>>
>>
>> We need the result of that queries taking into account only the last
>> values of each row. The result is inserted/updated in a in-memory K-V
>> database for fast access.
>>
>>
>>
>> Thanks in advance!
>>
>>
>>
>> Best
>>
>>
>>
>> *De: *Fabian Hueske 
>> *Fecha: *miércoles, 7 de agosto de 2019, 11:08
>> *Para: *Maatary Okouya 
>

Re: From Kafka Stream to Flink

2019-08-06 Thread Maatary Okouya
Fabian,

ultimately, i just want to perform a join on the last values for each keys.

On Tue, Aug 6, 2019 at 8:07 PM Maatary Okouya 
wrote:

> Fabian,
>
> could you please clarify the following statement:
>
> However joining an append-only table with this view without adding
> temporal join condition, means that the stream is fully materialized as
> state.
> This is because previously emitted results must be updated when the view
> changes.
> It really depends on the semantics of the join and query that you need,
> how much state the query will need to maintain.
>
>
> I am not sure to understand the problem. If i have to append-only table
> and perform some join on it, what's the issue ?
>
>
> On Tue, Aug 6, 2019 at 8:03 PM Maatary Okouya 
> wrote:
>
>> Thank you for the clarification. Really appreciated.
>>
>> Is Last_val part of the API ?
>>
>> On Fri, Aug 2, 2019 at 10:49 AM Fabian Hueske  wrote:
>>
>>> Hi,
>>>
>>> Flink does not distinguish between streams and tables. For the Table API
>>> / SQL, there are only tables that are changing over time, i.e., dynamic
>>> tables.
>>> A Stream in the Kafka Streams or KSQL sense, is in Flink a Table with
>>> append-only changes, i.e., records are only inserted and never deleted or
>>> modified.
>>> A Table in the Kafka Streams or KSQL sense, is in Flink a Table that has
>>> upsert and delete changes, i.e., the table has a unique key and records are
>>> inserted, deleted, or updated per key.
>>>
>>> In the current version, Flink does not have native support to ingest an
>>> upsert stream as a dynamic table (right now only append-only tables can be
>>> ingested, native support for upsert tables will be added soon.).
>>> However, you can create a view with the following SQL query on an
>>> append-only table that creates an upsert table:
>>>
>>> SELECT key, LAST_VAL(v1), LAST_VAL(v2), ...
>>> FROM appendOnlyTable
>>> GROUP BY key
>>>
>>> Given, this view, you can run all kinds of SQL queries on it.
>>> However joining an append-only table with this view without adding
>>> temporal join condition, means that the stream is fully materialized as
>>> state.
>>> This is because previously emitted results must be updated when the view
>>> changes.
>>> It really depends on the semantics of the join and query that you need,
>>> how much state the query will need to maintain.
>>>
>>> An alternative to using Table API / SQL and it's dynamic table
>>> abstraction is to use Flink's DataStream API and ProcessFunctions.
>>> These APIs are more low level and expose access to state and timers,
>>> which are the core ingredients for stream processing.
>>> You can implement pretty much all logic of KStreams and more in these
>>> APIs.
>>>
>>> Best, Fabian
>>>
>>>
>>> Am Di., 23. Juli 2019 um 13:06 Uhr schrieb Maatary Okouya <
>>> maatarioko...@gmail.com>:
>>>
>>>> I would like to have a KTable, or maybe in Flink term a dynamic Table,
>>>> that only contains the latest value for each keyed record. This would allow
>>>> me to perform aggregation and join, based on the latest state of every
>>>> record, as opposed to every record over time, or a period of time.
>>>>
>>>> On Sun, Jul 21, 2019 at 8:21 AM miki haiat  wrote:
>>>>
>>>>> Can you elaborate more  about your use case .
>>>>>
>>>>>
>>>>> On Sat, Jul 20, 2019 at 1:04 AM Maatary Okouya <
>>>>> maatarioko...@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I am a user of Kafka Stream so far. However, because i have been face
>>>>>> with several limitation in particular in performing Join on KTable.
>>>>>>
>>>>>> I was wondering what is the appraoch in Flink to achieve  (1) the
>>>>>> concept of KTable, i.e. a Table that represent a changeLog, i.e. only the
>>>>>> latest version of all keyed records,  and (2) joining those.
>>>>>>
>>>>>> There are currently a lot of limitation around that on Kafka Stream,
>>>>>> and i need that for performing some ETL process, where i need to mirror
>>>>>> entire databases in Kafka, and then do some join on the table to emit the
>>>>>> logical entity in Kafka Topics. I was hoping that somehow i could acheive
>>>>>> that by using FLink as intermediary.
>>>>>>
>>>>>> I can see that you support any kind of join, but i just don't see the
>>>>>> notion of Ktable.
>>>>>>
>>>>>>
>>>>>>


Re: From Kafka Stream to Flink

2019-08-06 Thread Maatary Okouya
Fabian,

could you please clarify the following statement:

However joining an append-only table with this view without adding temporal
join condition, means that the stream is fully materialized as state.
This is because previously emitted results must be updated when the view
changes.
It really depends on the semantics of the join and query that you need, how
much state the query will need to maintain.


I am not sure to understand the problem. If i have to append-only table and
perform some join on it, what's the issue ?


On Tue, Aug 6, 2019 at 8:03 PM Maatary Okouya 
wrote:

> Thank you for the clarification. Really appreciated.
>
> Is Last_val part of the API ?
>
> On Fri, Aug 2, 2019 at 10:49 AM Fabian Hueske  wrote:
>
>> Hi,
>>
>> Flink does not distinguish between streams and tables. For the Table API
>> / SQL, there are only tables that are changing over time, i.e., dynamic
>> tables.
>> A Stream in the Kafka Streams or KSQL sense, is in Flink a Table with
>> append-only changes, i.e., records are only inserted and never deleted or
>> modified.
>> A Table in the Kafka Streams or KSQL sense, is in Flink a Table that has
>> upsert and delete changes, i.e., the table has a unique key and records are
>> inserted, deleted, or updated per key.
>>
>> In the current version, Flink does not have native support to ingest an
>> upsert stream as a dynamic table (right now only append-only tables can be
>> ingested, native support for upsert tables will be added soon.).
>> However, you can create a view with the following SQL query on an
>> append-only table that creates an upsert table:
>>
>> SELECT key, LAST_VAL(v1), LAST_VAL(v2), ...
>> FROM appendOnlyTable
>> GROUP BY key
>>
>> Given, this view, you can run all kinds of SQL queries on it.
>> However joining an append-only table with this view without adding
>> temporal join condition, means that the stream is fully materialized as
>> state.
>> This is because previously emitted results must be updated when the view
>> changes.
>> It really depends on the semantics of the join and query that you need,
>> how much state the query will need to maintain.
>>
>> An alternative to using Table API / SQL and it's dynamic table
>> abstraction is to use Flink's DataStream API and ProcessFunctions.
>> These APIs are more low level and expose access to state and timers,
>> which are the core ingredients for stream processing.
>> You can implement pretty much all logic of KStreams and more in these
>> APIs.
>>
>> Best, Fabian
>>
>>
>> Am Di., 23. Juli 2019 um 13:06 Uhr schrieb Maatary Okouya <
>> maatarioko...@gmail.com>:
>>
>>> I would like to have a KTable, or maybe in Flink term a dynamic Table,
>>> that only contains the latest value for each keyed record. This would allow
>>> me to perform aggregation and join, based on the latest state of every
>>> record, as opposed to every record over time, or a period of time.
>>>
>>> On Sun, Jul 21, 2019 at 8:21 AM miki haiat  wrote:
>>>
>>>> Can you elaborate more  about your use case .
>>>>
>>>>
>>>> On Sat, Jul 20, 2019 at 1:04 AM Maatary Okouya 
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I am a user of Kafka Stream so far. However, because i have been face
>>>>> with several limitation in particular in performing Join on KTable.
>>>>>
>>>>> I was wondering what is the appraoch in Flink to achieve  (1) the
>>>>> concept of KTable, i.e. a Table that represent a changeLog, i.e. only the
>>>>> latest version of all keyed records,  and (2) joining those.
>>>>>
>>>>> There are currently a lot of limitation around that on Kafka Stream,
>>>>> and i need that for performing some ETL process, where i need to mirror
>>>>> entire databases in Kafka, and then do some join on the table to emit the
>>>>> logical entity in Kafka Topics. I was hoping that somehow i could acheive
>>>>> that by using FLink as intermediary.
>>>>>
>>>>> I can see that you support any kind of join, but i just don't see the
>>>>> notion of Ktable.
>>>>>
>>>>>
>>>>>


Re: From Kafka Stream to Flink

2019-07-23 Thread Maatary Okouya
I would like to have a KTable, or maybe in Flink term a dynamic Table, that
only contains the latest value for each keyed record. This would allow me
to perform aggregation and join, based on the latest state of every record,
as opposed to every record over time, or a period of time.

On Sun, Jul 21, 2019 at 8:21 AM miki haiat  wrote:

> Can you elaborate more  about your use case .
>
>
> On Sat, Jul 20, 2019 at 1:04 AM Maatary Okouya 
> wrote:
>
>> Hi,
>>
>> I am a user of Kafka Stream so far. However, because i have been face
>> with several limitation in particular in performing Join on KTable.
>>
>> I was wondering what is the appraoch in Flink to achieve  (1) the concept
>> of KTable, i.e. a Table that represent a changeLog, i.e. only the latest
>> version of all keyed records,  and (2) joining those.
>>
>> There are currently a lot of limitation around that on Kafka Stream, and
>> i need that for performing some ETL process, where i need to mirror entire
>> databases in Kafka, and then do some join on the table to emit the logical
>> entity in Kafka Topics. I was hoping that somehow i could acheive that by
>> using FLink as intermediary.
>>
>> I can see that you support any kind of join, but i just don't see the
>> notion of Ktable.
>>
>>
>>


From Kafka Stream to Flink

2019-07-19 Thread Maatary Okouya
Hi,

I am a user of Kafka Stream so far. However, because i have been face with
several limitation in particular in performing Join on KTable.

I was wondering what is the appraoch in Flink to achieve  (1) the concept
of KTable, i.e. a Table that represent a changeLog, i.e. only the latest
version of all keyed records,  and (2) joining those.

There are currently a lot of limitation around that on Kafka Stream, and i
need that for performing some ETL process, where i need to mirror entire
databases in Kafka, and then do some join on the table to emit the logical
entity in Kafka Topics. I was hoping that somehow i could acheive that by
using FLink as intermediary.

I can see that you support any kind of join, but i just don't see the
notion of Ktable.


Re: Non blocking operation in Apache flink

2016-05-25 Thread Maatary Okouya
Thank you,

i will study that. it is a bit more raw i would say. The thing is my source
is Kafka. I will have to see how to combine all of that altogether in the
most elegant way possible. Will get back to you on this, after i scratch my
head enough.

Best,

Daniel

On Wed, May 25, 2016 at 11:02 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> I see what you mean now. The Akka Streams API is very interesting, in how
> they allow async calls.
>
> For Flink, I think you could implement it as a custom source that listens
> for the change stream, starts futures to get data from the database and
> emits elements when the future completes. I quickly sketched such an
> approach:
>
>
> public static class MyDBSource implements ParallelSourceFunction {
> private static final long serialVersionUID = 1L;
>
> private volatile boolean running = true;
>
> @Override
> public void run(final SourceContext ctx) throws Exception {
> ChangelogConnection log = new ChangelogConnection();
> DB db = new DB();
>
> final Object checkpointLock = ctx.getCheckpointLock();
>
> while (running) {
> // try and fetch next changelog item
> Change change = log.getNextChange();
>
> DB.fetch(change, new Future() {
> public void complete(String data) {
> synchronized (checkpointLock) {
> ctx.collect(data);
> }
> }
> });
> }
> }
>
> @Override
> public void cancel() {
>     running = false;
> }
> }
>
> I hope that helps.
>
> -Aljoscha
>
> On Wed, 25 May 2016 at 12:21 Maatary Okouya <maatarioko...@gmail.com>
> wrote:
>
>> Maybe the following can illustrate better what i mean
>> http://doc.akka.io/docs/akka/2.4.6/scala/stream/stream-integrations.html#Integrating_with_External_Services
>>
>> On Wed, May 25, 2016 at 5:16 AM, Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>>
>>> Hi,
>>> there is no functionality to have asynchronous calls in user functions
>>> in Flink.
>>>
>>> The asynchronous action feature in Spark is also not meant for such
>>> things, it is targeted at programs that need to pull all data to the
>>> application master. In Flink this is not necessary because you can specify
>>> a whole plan of operations before executing them.
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Tue, 24 May 2016 at 20:43 Maatary Okouya <maatarioko...@gmail.com>
>>> wrote:
>>>
>>>> I'm looking for a way to avoid thread starvation in my tasks, by
>>>> returning future but i don't see how is that possible.
>>>>
>>>> Hence i would like to know, how flink handle the case where in your job
>>>> you have to perform network calls (I use akka http or spray) or any IO
>>>> operation and use the result of it.
>>>>
>>>> In sparks i see asynchronous action and so on. I don't see any
>>>> equivalent in apache flink. How does it works ? is it supported, or the
>>>> network call and any io operation have to be synchronous ?
>>>>
>>>> any help, indication, reads and so on would be appreciated
>>>>
>>>
>>


Re: Non blocking operation in Apache flink

2016-05-25 Thread Maatary Okouya
Maybe the following can illustrate better what i mean
http://doc.akka.io/docs/akka/2.4.6/scala/stream/stream-integrations.html#Integrating_with_External_Services

On Wed, May 25, 2016 at 5:16 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
> there is no functionality to have asynchronous calls in user functions in
> Flink.
>
> The asynchronous action feature in Spark is also not meant for such
> things, it is targeted at programs that need to pull all data to the
> application master. In Flink this is not necessary because you can specify
> a whole plan of operations before executing them.
>
> Cheers,
> Aljoscha
>
> On Tue, 24 May 2016 at 20:43 Maatary Okouya <maatarioko...@gmail.com>
> wrote:
>
>> I'm looking for a way to avoid thread starvation in my tasks, by
>> returning future but i don't see how is that possible.
>>
>> Hence i would like to know, how flink handle the case where in your job
>> you have to perform network calls (I use akka http or spray) or any IO
>> operation and use the result of it.
>>
>> In sparks i see asynchronous action and so on. I don't see any equivalent
>> in apache flink. How does it works ? is it supported, or the network call
>> and any io operation have to be synchronous ?
>>
>> any help, indication, reads and so on would be appreciated
>>
>


Re: Non blocking operation in Apache flink

2016-05-25 Thread Maatary Okouya
Thank you for your answer.

Maybe I should have mentioned that I am at the beginning with both
framework, somewhat making a choice by evaluating their capability. I know
Akka stream better.

So my question would be simple. Let say that

1-/ have a stream of event that are simply information about the fact that
some item have changed somewhere in a database.

2-/ I need for each of those event, to query the db to get the new version
of the item

3-/ apply some transformation


4-/connect to another Db and write that results.


My question here is as follow:

How am I suppose to make the call to both db in and out. Should those calls
be synchronous ?


I come from scala and Akka, where typically we avoid to make blocking calls
and use future all the ways for this kind of situation. Akka stream allows
that fine grain level of detail for stream processing for instance. This
avoid thread starvation. While I make the io operation the thread can be
used for something else.



So I believe that somehow this can be reproduce with both frameworks.

Can you please explain how this is supposed to be handled in Flink ?


On Wed, May 25, 2016 at 5:17 AM Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
> there is no functionality to have asynchronous calls in user functions in
> Flink.
>
> The asynchronous action feature in Spark is also not meant for such
> things, it is targeted at programs that need to pull all data to the
> application master. In Flink this is not necessary because you can specify
> a whole plan of operations before executing them.
>
> Cheers,
> Aljoscha
>
> On Tue, 24 May 2016 at 20:43 Maatary Okouya <maatarioko...@gmail.com>
> wrote:
>
>> I'm looking for a way to avoid thread starvation in my tasks, by
>> returning future but i don't see how is that possible.
>>
>> Hence i would like to know, how flink handle the case where in your job
>> you have to perform network calls (I use akka http or spray) or any IO
>> operation and use the result of it.
>>
>> In sparks i see asynchronous action and so on. I don't see any equivalent
>> in apache flink. How does it works ? is it supported, or the network call
>> and any io operation have to be synchronous ?
>>
>> any help, indication, reads and so on would be appreciated
>>
>


Non blocking operation in Apache flink

2016-05-24 Thread Maatary Okouya
I'm looking for a way to avoid thread starvation in my tasks, by returning
future but i don't see how is that possible.

Hence i would like to know, how flink handle the case where in your job you
have to perform network calls (I use akka http or spray) or any IO
operation and use the result of it.

In sparks i see asynchronous action and so on. I don't see any equivalent
in apache flink. How does it works ? is it supported, or the network call
and any io operation have to be synchronous ?

any help, indication, reads and so on would be appreciated