Kafka dynamic topic for Sink in SQL

2021-05-20 Thread Benoît Paris
Hi all!

I'm looking for a way to write to different Kafka topics based on some
column value in SQL.

I think it's possible with Java, using KafkaSerializationSchema,
and ProducerRecord(topic, ...), but I was wondering if I could somewhat
access that feature in SQL.

I'm also trying to evaluate the amount of work required so that I implement
it myself, subclassing the Kafka SQL connector just to add that feature.

Another alternative for me is to try to preprocess the SQL, detect Kafka
Sinks, force a DataStream conversion, then replace the Kafka SQL sink with
an equivalent DataStream that has the topic routing. (but this feels rather
brittle and maintenance-hard to me, rather than having the option in the
SQL sink configuration)

All comments/opinions/advice welcome!
Cheers
Ben


Interesting article about correctness and latency

2021-04-18 Thread Benoît Paris
Hi all!

I read this very interesting and refreshing article today, about
correctness and (vs?) latency in streaming systems. I thought I'd share it.

https://scattered-thoughts.net/writing/internal-consistency-in-streaming-systems/

With some comments on Hacker News:

https://news.ycombinator.com/item?id=26851803

Cheers
Ben


Re: Writing retract streams to Kafka

2020-03-05 Thread Benoît Paris
Hi Gyula,

I'm afraid conversion to see the retractions vs inserts can't be done in
pure SQL (though I'd love that feature).

You might want to go lower level and implement a RetractStreamTableSink
[1][2] that you would wrap around a KafkaTableSink [3]. This will give you
a emitDataStream(DataStream> dataStream);, in which the
Boolean flag will give you an 'accumulate' or 'retract' signal.
You can then filter the DataStream accordingly before passing to the
KafkaTableSink.

Hope this helps.

Best regards
Benoît

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/sinks/RetractStreamTableSink.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#retractstreamtablesink
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.html

On Thu, Mar 5, 2020 at 2:50 PM Gyula Fóra  wrote:

> Hi Roman,
>
> This is the core logic:
>
> CREATE TABLE QueryResult (
> queryIdBIGINT,
>   itemIdSTRING,
>   quantity INT
> ) WITH (
> 'connector.type' = 'kafka',
> 'connector.version' = 'universal',
> 'connector.topic'   = 'query.output.log.1',
> 'connector.properties.bootstrap.servers' = '',
> 'format.type' = 'json'
> );
>
> INSERT INTO QueryResult
> SELECT q.queryId, t.itemId, sum(t.quantity) AS quantity
> FROM
>   ItemTransactions AS t,
>   Queries AS q
> WHERE
>   t.itemId = q.itemId AND
>   t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND q.event_time
> GROUP BY
>   t.itemId, q.event_time, q.queryId;
>
> And the error I get is:
> org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL
> update statement.
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:697)
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576)
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527)
> at
> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:548)
> at
> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:310)
> at java.util.Optional.ifPresent(Optional.java:159)
> at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211)
> at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
> Caused by: org.apache.flink.table.api.TableException:
> AppendStreamTableSink requires that Table has only insert changes.
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>
> I am wondering what could I do to just simply pump the result updates to
> Kafka here.
>
> Gyula
>
> On Thu, Mar 5, 2020 at 2:37 PM Khachatryan Roman <
> khachatryan.ro...@gmail.com> wrote:
>
>> Hi Gyula,
>>
>> Could you provide the code of your Flink program, the error with
>> stacktrace and the Flink version?
>>
>> Thanks.,
>> Roman
>>
>>
>> On Thu, Mar 5, 2020 at 2:17 PM Gyula Fóra  wrote:
>>
>>> Hi All!
>>>
>>> Excuse my stupid question, I am pretty new to the Table/SQL API and I am
>>> trying to play around with it implementing and running a few use-cases.
>>>
>>> I have a simple window join + aggregation, grouped on some id that I
>>> want to write to Kafka but I am hitting the following error:
>>>
>>> "AppendStreamTableSink requires that Table has only insert changes."
>>>
>>> If I understand correctly the problem here is that since updates are
>>> possible within a single group, we have a retract stream and the Kafka Sink
>>> cannot handle that. I tried to search for the solution but I haven't found
>>> any satisfying answers.
>>>
>>> How can I simply tell the INSERT logic to ignore previous values and
>>> just always keep sending the latest (like you would see it on the CLI
>>> output).
>>>
>>> Thank you!
>>> Gyula
>>>
>>

-- 
Benoît Paris
Ingénieur Machine Learning Explicable
Tél : +33 6 60 74 23 00
http://benoit.paris
http://explicable.ml


Re: Colocating Sub-Tasks across jobs / Sharing Task Slots across jobs

2020-02-25 Thread Benoît Paris
Hi Xintong

Thank you for your answer. This seems promising, I'll look into it.

Do you believe the code of the operators of the restarted Region can be
changed between restarts?

Best
Benoît


On Tue, Feb 25, 2020 at 2:30 AM Xintong Song  wrote:

> Hi Ben,
>
> You can not share slots across jobs. Flink adopts a two-level slot
> scheduling mechanism. Slots are firstly allocated to each job, then the
> JobMaster decides which tasks should be executed in which slots, i.e. slot
> sharing.
>
> I think what you are looking for is Pipelined Region Restart Strategy [1],
> which restarts only the tasks connected by pipelined edges instead of the
> whole job graph.
>
> Thank you~
>
> Xintong Song
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/task_failure_recovery.html#restart-pipelined-region-failover-strategy
>
>
>
> On Mon, Feb 24, 2020 at 11:28 PM Benoît Paris <
> benoit.pa...@centraliens-lille.org> wrote:
>
>> Hello all!
>>
>> I have a setup composed of several streaming pipelines. These have
>> different deployment lifecycles: I want to be able to modify and redeploy
>> the topology of one while the other is still up. I am thus putting them in
>> different jobs.
>>
>> The problem is I have a Co-Location constraint between one subtask of
>> each pipeline; I'd like them to run on the same TaskSlots, much like if
>> they were sharing a TaskSlot; or at least have them on the same JVM.
>>
>> A semi-official feature
>> "DataStream.getTransformation().setCoLocationGroupKey(stringKey)" [1]
>> exists for this, but seem to be tied to the Sub-Tasks actually being able
>> to be co-located on the same Task Slot.
>>
>> The documentation mentions [2] that it might be impossible to do ("Flink
>> allows subtasks to share slots even if they are subtasks of different
>> tasks, so long as they are *from the same job*").
>>
>> The streaming pipelines are numerous (about 10), and I can't afford to
>> increase the number of TaskSlots per TaskManager. I also would like to
>> avoid putting all the pipelines in the same job, restarting it every time a
>> single one changes.
>>
>> I'd like to have mailing list's informed opinion about this, if there are
>> workarounds, or if I could reconsider my problem under another angle.
>>
>> Cheers
>> Ben
>>
>> [1]
>> https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java#L116
>>
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-master/concepts/runtime.html#task-slots-and-resources
>>
>

-- 
Benoît Paris
Ingénieur Machine Learning Explicable
Tél : +33 6 60 74 23 00
http://benoit.paris
http://explicable.ml


Colocating Sub-Tasks across jobs / Sharing Task Slots across jobs

2020-02-24 Thread Benoît Paris
Hello all!

I have a setup composed of several streaming pipelines. These have
different deployment lifecycles: I want to be able to modify and redeploy
the topology of one while the other is still up. I am thus putting them in
different jobs.

The problem is I have a Co-Location constraint between one subtask of each
pipeline; I'd like them to run on the same TaskSlots, much like if they
were sharing a TaskSlot; or at least have them on the same JVM.

A semi-official feature
"DataStream.getTransformation().setCoLocationGroupKey(stringKey)" [1]
exists for this, but seem to be tied to the Sub-Tasks actually being able
to be co-located on the same Task Slot.

The documentation mentions [2] that it might be impossible to do ("Flink
allows subtasks to share slots even if they are subtasks of different
tasks, so long as they are *from the same job*").

The streaming pipelines are numerous (about 10), and I can't afford to
increase the number of TaskSlots per TaskManager. I also would like to
avoid putting all the pipelines in the same job, restarting it every time a
single one changes.

I'd like to have mailing list's informed opinion about this, if there are
workarounds, or if I could reconsider my problem under another angle.

Cheers
Ben

[1]
https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java#L116

[2]
https://ci.apache.org/projects/flink/flink-docs-master/concepts/runtime.html#task-slots-and-resources


Re: where does flink store the intermediate results of a join and what is the key?

2020-01-27 Thread Benoît Paris
Dang what a massive PR: Files changed2,118,  +104,104 −29,161 lines changed.
Thanks for the details, Jark!

On Mon, Jan 27, 2020 at 4:07 PM Jark Wu  wrote:

> Hi Kant,
> Having a custom state backend is very difficult and is not recommended.
>
> Hi Benoît,
> Yes, the "Query on the intermediate state is on the roadmap" I
> mentioned is referring to integrate Table API & SQL with Queryable State.
> We also have an early issue FLINK-6968 to tracks this.
>
> Best,
> Jark
>
>
> On Fri, 24 Jan 2020 at 00:26, Benoît Paris <
> benoit.pa...@centraliens-lille.org> wrote:
>
>> Hi all!
>>
>> @Jark, out of curiosity, would you be so kind as to expand a bit on "Query
>> on the intermediate state is on the roadmap"?
>> Are you referring to working on QueryableStateStream/QueryableStateClient
>> [1], or around "FOR SYSTEM_TIME AS OF" [2], or on other APIs/concepts (is
>> there a FLIP?)?
>>
>> Cheers
>> Ben
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#querying-state
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html#temporal-table
>>
>>
>> On Thu, Jan 23, 2020 at 6:40 AM kant kodali  wrote:
>>
>>> Is it a common practice to have a custom state backend? if so, what
>>> would be a popular custom backend?
>>>
>>> Can I do Elasticseatch as a state backend?
>>>
>>> Thanks!
>>>
>>> On Wed, Jan 22, 2020 at 1:42 AM Jark Wu  wrote:
>>>
>>>> Hi Kant,
>>>>
>>>> 1) List of row is also sufficient in this case. Using a MapState is in
>>>> order to retract a row faster, and save the storage size.
>>>>
>>>> 2) State Process API is usually used to process save point. I’m afraid
>>>> the performance is not good to use it for querying.
>>>> On the other side, AFAIK, State Process API requires the uid of
>>>> operator. However, uid of operators is not set in Table API & SQL.
>>>> So I’m not sure whether it works or not.
>>>>
>>>> 3)You can have a custom statebackend by
>>>> implement org.apache.flink.runtime.state.StateBackend interface, and use it
>>>> via `env.setStateBackend(…)`.
>>>>
>>>> Best,
>>>> Jark
>>>>
>>>> On Wed, 22 Jan 2020 at 14:16, kant kodali  wrote:
>>>>
>>>>> Hi Jark,
>>>>>
>>>>> 1) shouldn't it be a col1 to List of row? multiple rows can have the
>>>>> same joining key right?
>>>>>
>>>>> 2) Can I use state processor API
>>>>> <https://flink.apache.org/feature/2019/09/13/state-processor-api.html>
>>>>> from an external application to query the intermediate results in near
>>>>> real-time? I thought querying rocksdb state is a widely requested feature.
>>>>> It would be really great to consider this feature for 1.11
>>>>>
>>>>> 3) Is there any interface where I can implement my own state backend?
>>>>>
>>>>> Thanks!
>>>>>
>>>>>
>>>>> On Tue, Jan 21, 2020 at 6:08 PM Jark Wu  wrote:
>>>>>
>>>>>> Hi Kant,
>>>>>>
>>>>>> 1) Yes, it will be stored in rocksdb statebackend.
>>>>>> 2) In old planner, the left state is the same with right state which
>>>>>> are both `>>`.
>>>>>> It is a 2-level map structure, where the `col1` is the join key,
>>>>>> it is the first-level key of the state. The key of the MapState is the
>>>>>> input row,
>>>>>> and the `count` is the number of this row, the expiredTime
>>>>>> indicates when to cleanup this row (avoid infinite state size). You can
>>>>>> find the source code here[1].
>>>>>> In blink planner, the state structure will be more complex which
>>>>>> is determined by the meta-information of upstream. You can see the source
>>>>>> code of blink planner here [2].
>>>>>> 3) Currently, the intermediate state is not exposed to users.
>>>>>> Usually, users should write the query result to an external system (like
>>>>>> Mysql) and query the external system.
>>>>>> Query on the intermediate state is on the roadmap, but I guess it
>&g

Re: where does flink store the intermediate results of a join and what is the key?

2020-01-23 Thread Benoît Paris
Hi all!

@Jark, out of curiosity, would you be so kind as to expand a bit on "Query
on the intermediate state is on the roadmap"?
Are you referring to working on QueryableStateStream/QueryableStateClient
[1], or around "FOR SYSTEM_TIME AS OF" [2], or on other APIs/concepts (is
there a FLIP?)?

Cheers
Ben

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#querying-state
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html#temporal-table


On Thu, Jan 23, 2020 at 6:40 AM kant kodali  wrote:

> Is it a common practice to have a custom state backend? if so, what would
> be a popular custom backend?
>
> Can I do Elasticseatch as a state backend?
>
> Thanks!
>
> On Wed, Jan 22, 2020 at 1:42 AM Jark Wu  wrote:
>
>> Hi Kant,
>>
>> 1) List of row is also sufficient in this case. Using a MapState is in
>> order to retract a row faster, and save the storage size.
>>
>> 2) State Process API is usually used to process save point. I’m afraid
>> the performance is not good to use it for querying.
>> On the other side, AFAIK, State Process API requires the uid of
>> operator. However, uid of operators is not set in Table API & SQL.
>> So I’m not sure whether it works or not.
>>
>> 3)You can have a custom statebackend by
>> implement org.apache.flink.runtime.state.StateBackend interface, and use it
>> via `env.setStateBackend(…)`.
>>
>> Best,
>> Jark
>>
>> On Wed, 22 Jan 2020 at 14:16, kant kodali  wrote:
>>
>>> Hi Jark,
>>>
>>> 1) shouldn't it be a col1 to List of row? multiple rows can have the
>>> same joining key right?
>>>
>>> 2) Can I use state processor API
>>> <https://flink.apache.org/feature/2019/09/13/state-processor-api.html>
>>> from an external application to query the intermediate results in near
>>> real-time? I thought querying rocksdb state is a widely requested feature.
>>> It would be really great to consider this feature for 1.11
>>>
>>> 3) Is there any interface where I can implement my own state backend?
>>>
>>> Thanks!
>>>
>>>
>>> On Tue, Jan 21, 2020 at 6:08 PM Jark Wu  wrote:
>>>
>>>> Hi Kant,
>>>>
>>>> 1) Yes, it will be stored in rocksdb statebackend.
>>>> 2) In old planner, the left state is the same with right state which
>>>> are both `>>`.
>>>> It is a 2-level map structure, where the `col1` is the join key, it
>>>> is the first-level key of the state. The key of the MapState is the input
>>>> row,
>>>> and the `count` is the number of this row, the expiredTime
>>>> indicates when to cleanup this row (avoid infinite state size). You can
>>>> find the source code here[1].
>>>> In blink planner, the state structure will be more complex which is
>>>> determined by the meta-information of upstream. You can see the source code
>>>> of blink planner here [2].
>>>> 3) Currently, the intermediate state is not exposed to users. Usually,
>>>> users should write the query result to an external system (like Mysql) and
>>>> query the external system.
>>>> Query on the intermediate state is on the roadmap, but I guess it
>>>> is not in 1.11 plan.
>>>>
>>>> Best,
>>>> Jark
>>>>
>>>> [1]:
>>>> http://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala#L61
>>>> [2]:
>>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java#L45
>>>>
>>>>
>>>> 2020年1月21日 18:01,kant kodali  写道:
>>>>
>>>> Hi All,
>>>>
>>>> If I run a query like this
>>>>
>>>> StreamTableEnvironment.sqlQuery("select * from table1 join table2 on
>>>> table1.col1 = table2.col1")
>>>>
>>>> 1) Where will flink store the intermediate result? Imagine
>>>> flink-conf.yaml says state.backend = 'rocksdb'
>>>>
>>>> 2) If the intermediate results are stored in rockdb then what is the
>>>> key and value in this case(given the query above)?
>>>>
>>>> 3) What is the best way to query these intermediate results from an
>>>> external application? while the job is running and while the job is not
>>>> running?
>>>>
>>>> Thanks!
>>>>
>>>>
>>>>

-- 
Benoît Paris
Ingénieur Machine Learning Explicable
Tél : +33 6 60 74 23 00
http://benoit.paris
http://explicable.ml


TableSource being duplicated

2020-01-22 Thread Benoît Paris
Hello all!

I'm having a problem with TableSources' DataStream being duplicated when
pulled on from 2 sinks.

I understand that sometimes the best plan might just be to duplicate and
read both times a TableSource/SourceFunction; but in my case I can't quite
reproduce the data as say Kafka would. I just need the SourceFunction and
DataStream provided by the TableSource to not be duplicated.

As a workaround to this issue, I introduce some sort of materialization
barrier that makes the planner pull only on one instance of the
TableSource/SourceFunction:
Instead of:

tEnv.registerTableSource("foo_table", new FooTableSource());

I convert it to an Append Stream, and back again to a Table:

tEnv.registerTableSource("foo_table_source", new FooTableSource());
Table sourceTable = tEnv.sqlQuery("SELECT * FROM foo_table_source");
Table appendingSourceTable = tEnv.fromDataStream(
tEnv.toAppendStream(sourceTable, Types.ROW(new
String[]{"field_1"}, new TypeInformation[]{Types.LONG()}))
);
tEnv.registerTable("foo_table", appendingSourceTable);

And the conversion to an Append Stream somewhat makes the planner behave
and there is only one DataSource in the execution plan.

But I'm feeling like I'm just missing a simple option (on the
SourceFunction, or on the TableSource?) to invoke and declare the Source as
being non duplicateable.

I have tried a lot of options (uid(), operation chaining restrictions,
twiddling the transformation, forceNonParallel(), etc.), but can't find
quite how to do that! My SourceFunction is a RichSourceFunction

At this point I'm wondering if this is a bug, or if it is a feature that
would have to be implemented.

Cheers,
Ben


Re: Implementing a tick service

2020-01-21 Thread Benoît Paris
Hello all!

Please disregard the last message; I used Thread.sleep() and Stateful
Source Functions
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#stateful-source-functions>
.

But just out of curiosity, can processing-time Timers get rescheduled
inside the onTimer method?






On Mon, Jan 20, 2020 at 7:04 PM Benoît Paris <
benoit.pa...@centraliens-lille.org> wrote:

> Hello all :)!
>
> I'm having trouble creating a tick service.
>
> Goal: register a TableSource that emits a Row roughly every 200ms in
> processing time. The Row would contain only one column "counter" that is
> incremented by 1 each Row.
>
> Current attempt: Using TimerService
> A TableSource with
>
> public DataStream getDataStream(StreamExecutionEnvironment execEnv) {
> return execEnv
> .fromElements((Long) offset) // default 0L, one element
> .keyBy(new NullByteKeySelector<>())
> .process(new TickKeyedProcessFunction(200L))
> .forceNonParallel();
> }
>
> And a KeyedProcessFunction with onTimer doing the heavy-lifting:
>
> public void processElement(Long value, Context context, Collector 
> collector) throws IOException {
> // called once
> counter.update(value);
> Long now = System.currentTimeMillis();
> context.timerService().registerProcessingTimeTimer(now);
> }
>
>  public void onTimer(long timestamp, OnTimerContext ctx, Collector out) 
> throws Exception {
> Long then = timestamp + interval;
> Long current = counter.value();
> current++;
> counter.update(current);
> ctx.timerService().registerProcessingTimeTimer(then);
> out.collect(current);
> }
>
> Now, the runtime tells me the Source is in FINISHED status. So obviously
> there must be limitations around re-scheduling one key inside onTimer.
>
> Is there a way to use the TimerService to go around that?
> Also, how would you implement this tick service by other means?
>
> Cheers
> Ben
>
>
>

-- 
Benoît Paris
Ingénieur Machine Learning Explicable
Tél : +33 6 60 74 23 00
http://benoit.paris
http://explicable.ml


Implementing a tick service

2020-01-20 Thread Benoît Paris
Hello all :)!

I'm having trouble creating a tick service.

Goal: register a TableSource that emits a Row roughly every 200ms in
processing time. The Row would contain only one column "counter" that is
incremented by 1 each Row.

Current attempt: Using TimerService
A TableSource with

public DataStream getDataStream(StreamExecutionEnvironment execEnv) {
return execEnv
.fromElements((Long) offset) // default 0L, one element
.keyBy(new NullByteKeySelector<>())
.process(new TickKeyedProcessFunction(200L))
.forceNonParallel();
}

And a KeyedProcessFunction with onTimer doing the heavy-lifting:

public void processElement(Long value, Context context,
Collector collector) throws IOException {
// called once
counter.update(value);
Long now = System.currentTimeMillis();
context.timerService().registerProcessingTimeTimer(now);
}

 public void onTimer(long timestamp, OnTimerContext ctx,
Collector out) throws Exception {
Long then = timestamp + interval;
Long current = counter.value();
current++;
counter.update(current);
ctx.timerService().registerProcessingTimeTimer(then);
out.collect(current);
}

Now, the runtime tells me the Source is in FINISHED status. So obviously
there must be limitations around re-scheduling one key inside onTimer.

Is there a way to use the TimerService to go around that?
Also, how would you implement this tick service by other means?

Cheers
Ben


Re: Controlling the Materialization of JOIN updates

2020-01-05 Thread Benoît Paris
Hi Kurt,

Thank you for your answer.

Yes both fact tables and dimension tables are changing over time; it was to
illustrate that they could change at the same time but that we could still
make a JOIN basically ignore updates from one specified side. The SQL is
not the actual one I'm using, and as you have said later on, I indeed don't
deal with a time attribute and just want what's in the table at processing
time.

At the moment my problem seems to be in good way of being resolved, and it
is going to be Option 4: "LATERAL TABLE table_function" on the Blink
planner; as Jark Wu seems to be -elegantly- providing a patch for the
FLINK-14200 NPE bug:
https://github.com/apache/flink/pull/10763
It was indeed about shenanigans on finding the proper RelOptSchema;  Ah, I
wish I had dived sooner in the source code, and I could have had the
pleasure opportunity to contribute to the Flink codebase.

Anyway, shout out to Jark for resolving the bug and providing a patch! I
believe this will be a real enabler for CQRS architectures on Flink (we had
subscriptions with regular joins, and this patch enables querying the same
thing with very minor SQL modifications)

Kind regards
Benoît


On Sat, Jan 4, 2020 at 4:22 AM Kurt Young  wrote:

> Hi Benoît,
>
> Before discussing all the options you listed, I'd like understand more
> about your requirements.
>
> The part I don't fully understand is, both your fact (Event) and dimension
> (DimensionAtJoinTimeX) tables are
> coming from the same table, Event or EventRawInput in your case. So it
> will result that both your fact and
> dimension tables are changing with time.
>
> My understanding is, when your DimensionAtJoinTimeX table emit the
> results, you don't want to change the
> result again. You want the fact table only join whatever data currently
> the dimension table have? I'm asking
> because your dimension table was calculated with a window aggregation, but
> your join logic seems doesn't
> care about the time attribute (LEFT JOIN DimensionAtJoinTime1 d1 ON e.uid
> = d1.uid). It's possible that
> when a record with uid=x comes from Event table, but the dimension table
> doesn't have any data around
> uid=x yet due to the window aggregation. In this case, you don't want them
> to join?
>
> Best,
> Kurt
>
>
> On Fri, Jan 3, 2020 at 1:11 AM Benoît Paris <
> benoit.pa...@centraliens-lille.org> wrote:
>
>> Hello all!
>>
>> I'm trying to design a stream pipeline, and have trouble controlling when
>> a JOIN is triggering an update:
>>
>> Setup:
>>
>>- The Event table; "probe side", "query side", the result of earlier
>>stream processing
>>- The DimensionAtJoinTimeX tables; of updating nature, "build side",
>>the results of earlier stream processing
>>
>> Joining them:
>>
>> SELECT*
>> FROM  Event e
>> LEFT JOIN DimensionAtJoinTime1 d1
>>   ON  e.uid = d1.uid
>> LEFT JOIN DimensionAtJoinTime2 d2
>>   ON  e.uid = d2.uid
>>
>> The DimensionAtJoinTimeX Tables being the result of earlier stream
>> processing, possibly from the same Event table:
>>
>> SELECT   uid,
>>  hop_start(...),
>>  sum(...)
>> FROM Event e
>> GROUP BY uid,
>>  hop(...)
>>
>> The Event Table being:
>>
>> SELECT ...
>> FROM   EventRawInput i
>> WHERE  i.some_field = 'some_value'
>>
>> Requirements:
>>
>>- I need the JOINs to only be executed once, only when a new line is
>>appended to the probe / query / Event table.
>>- I also need the full pipeline to be defined in SQL.
>>- I very strongly prefer the Blink planner (mainly for Deduplication,
>>TopN and LAST_VALUE features).
>>
>> Problem exploration so far:
>>
>>- Option 1, "FOR SYSTEM_TIME AS OF" [1]: I need to have the solution
>>in SQL: it doesn't work out. But I might explore the following: insert
>>DimensionAtJoinTimeX into a special Sink, and use it in a
>>LookupableTableSource (I'm at a loss on how to do that, though. Do I need
>>an external kv store?).
>>- Option 2, "FOR SYSTEM_TIME AS OF" [1], used in SQL: Is there a
>>version of "FOR SYSTEM_TIME AS OF" readily usable in SQL? I might have
>>missed something in the documentation.
>>- Option 3, "LATERAL TABLE table_function" [2], on the Legacy
>>planner: It does not work with two tables [3], and I don't get to have the
>>Blink planner features.
>>- Option 4, "LATE

Re: [DISCUSS] Set default planner for SQL Client to Blink planner in 1.10 release

2020-01-03 Thread Benoît Paris
>  If anyone finds that blink planner has any significant defects and has a
larger regression than the old planner, please let us know.

Overall, the Blink-exclusive features are must (TopN, deduplicate,
LAST_VALUE, plan reuse, etc)! But all use cases of the Legacy planner in
production are not covered:
An edge case of Temporal Table Functions does not allow computed Tables (as
opposed to TableSources) to be used on the query side in Blink (
https://issues.apache.org/jira/browse/FLINK-14200)

Cheers
Ben


On Fri, Jan 3, 2020 at 10:00 AM Jeff Zhang  wrote:

> +1, I have already made blink as the default planner of flink interpreter
> in Zeppelin
>
>
> Jingsong Li  于2020年1月3日周五 下午4:37写道:
>
>> Hi Jark,
>>
>> +1 for default blink planner in SQL-CLI.
>> I believe this new planner can be put into practice in production.
>> We've worked hard for nearly a year, but the old planner didn't move on.
>>
>> And I'd like to cc to user@flink.apache.org.
>> If anyone finds that blink planner has any significant defects and has a
>> larger regression than the old planner, please let us know. We will be very
>> grateful.
>>
>> Best,
>> Jingsong Lee
>>
>> On Fri, Jan 3, 2020 at 4:14 PM Leonard Xu  wrote:
>>
>>> +1 for this.
>>> We bring many SQL/API features and enhance stability in 1.10 release,
>>> and almost all of them happens in Blink planner.
>>> SQL CLI is the most convenient entrypoint for me, I believe many users
>>> will have a better experience If we set Blink planner as default planner.
>>>
>>> Best,
>>> Leonard
>>>
>>> > 在 2020年1月3日,15:16,Terry Wang  写道:
>>> >
>>> > Since what blink planner can do is a superset of flink planner, big +1
>>> for changing the default planner to Blink planner from my side.
>>> >
>>> > Best,
>>> > Terry Wang
>>> >
>>> >
>>> >
>>> >> 2020年1月3日 15:00,Jark Wu  写道:
>>> >>
>>> >> Hi everyone,
>>> >>
>>> >> In 1.10 release, Flink SQL supports many awesome features and
>>> improvements,
>>> >> including:
>>> >> - support watermark statement and computed column in DDL
>>> >> - fully support all data types in Hive
>>> >> - Batch SQL performance improvements (TPC-DS 7x than Hive MR)
>>> >> - support INSERT OVERWRITE and INSERT PARTITION
>>> >>
>>> >> However, all the features and improvements are only avaiable in Blink
>>> >> planner, not in Old planner.
>>> >> There are also some other features are limited in Blink planner, e.g.
>>> >> Dimension Table Join [1],
>>> >> TopN [2], Deduplicate [3], streaming aggregates optimization [4], and
>>> so on.
>>> >>
>>> >> But Old planner is still the default planner in Table API & SQL. It is
>>> >> frustrating for users to set
>>> >> to blink planner manually when every time start a SQL CLI. And it's
>>> >> surprising to see unsupported
>>> >> exception if they trying out the new features but not switch planner.
>>> >>
>>> >> SQL CLI is a very important entrypoint for trying out new feautures
>>> and
>>> >> prototyping for users.
>>> >> In order to give new planner more exposures, I would like to suggest
>>> to set
>>> >> default planner
>>> >> for SQL Client to Blink planner before 1.10 release.
>>> >>
>>> >> The approach is just changing the default SQL CLI yaml
>>> configuration[5]. In
>>> >> this way, the existing
>>> >> environment is still compatible and unaffected.
>>> >>
>>> >> Changing the default planner for the whole Table API & SQL is another
>>> topic
>>> >> and is out of scope of this discussion.
>>> >>
>>> >> What do you think?
>>> >>
>>> >> Best,
>>> >> Jark
>>> >>
>>> >> [1]:
>>> >>
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
>>> >> [2]:
>>> >>
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#top-n
>>> >> [3]:
>>> >>
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication
>>> >> [4]:
>>> >>
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tuning/streaming_aggregation_optimization.html
>>> >> [5]:
>>> >>
>>> https://github.com/apache/flink/blob/master/flink-table/flink-sql-client/conf/sql-client-defaults.yaml#L100
>>> >
>>>
>>>
>>
>> --
>> Best, Jingsong Lee
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


-- 
Benoît Paris
Ingénieur Machine Learning Explicable
Tél : +33 6 60 74 23 00
http://benoit.paris
http://explicable.ml


Controlling the Materialization of JOIN updates

2020-01-02 Thread Benoît Paris
Hello all!

I'm trying to design a stream pipeline, and have trouble controlling when a
JOIN is triggering an update:

Setup:

   - The Event table; "probe side", "query side", the result of earlier
   stream processing
   - The DimensionAtJoinTimeX tables; of updating nature, "build side", the
   results of earlier stream processing

Joining them:

SELECT*
FROM  Event e
LEFT JOIN DimensionAtJoinTime1 d1
  ON  e.uid = d1.uid
LEFT JOIN DimensionAtJoinTime2 d2
  ON  e.uid = d2.uid

The DimensionAtJoinTimeX Tables being the result of earlier stream
processing, possibly from the same Event table:

SELECT   uid,
 hop_start(...),
 sum(...)
FROM Event e
GROUP BY uid,
 hop(...)

The Event Table being:

SELECT ...
FROM   EventRawInput i
WHERE  i.some_field = 'some_value'

Requirements:

   - I need the JOINs to only be executed once, only when a new line is
   appended to the probe / query / Event table.
   - I also need the full pipeline to be defined in SQL.
   - I very strongly prefer the Blink planner (mainly for Deduplication,
   TopN and LAST_VALUE features).

Problem exploration so far:

   - Option 1, "FOR SYSTEM_TIME AS OF" [1]: I need to have the solution in
   SQL: it doesn't work out. But I might explore the following: insert
   DimensionAtJoinTimeX into a special Sink, and use it in a
   LookupableTableSource (I'm at a loss on how to do that, though. Do I need
   an external kv store?).
   - Option 2, "FOR SYSTEM_TIME AS OF" [1], used in SQL: Is there a version
   of "FOR SYSTEM_TIME AS OF" readily usable in SQL? I might have missed
   something in the documentation.
   - Option 3, "LATERAL TABLE table_function" [2], on the Legacy planner:
   It does not work with two tables [3], and I don't get to have the Blink
   planner features.
   - Option 4, "LATERAL TABLE table_function" [2], on the Blink planner: It
   does not work with the "probe side" being the results of earlier stream
   processing [4].
   - Option 5, let a regular JOIN materialize the updates, and somehow find
   how to filter the ones coming from the build sides (I'm at a loss on how to
   do that).
   - Option 6, "TVR": I read this paper [5], which mentions "Time-Varying
   Relation"s; Speculating here: could there be a way, to say that the build
   side is not a TVR. Aka declare the stream as being somehow "static", while
   still being updated (but I guess we're back to "FOR SYSTEM_TIME AS OF").
   - Option 7: Is there some features being developed, or hints, or
   workarounds to control the JOIN updates that I have not considered so far?
   - Remark 1: I believe that FLINK-15112 and FLINK-14200 are of the same
   bug nature, even though they occur in different situations on different
   planners (same Exception Stack Trace on files that have the same historical
   parent before the Blink fork). FLINK-15112 has a workaround, but
   FLINK-14200 does not. The existence of that workaround IMHO signals that
   there is a simple fix for both bugs. I have tried to find it in Flink for a
   few days, but no success so far. If you guys have pointers helping me
   provide a fix, I'll gladly listen. So far I have progressed to: It revolves
   around Calcite-based Flink streaming rules transforming a temporal table
   function correlate into a Join on 2*Scan, and crashes when it encounters
   something that is not a table that can be readily scanned. Also, there are
   shenanigans on trying to find the right schema in the Catalog. But I am
   blocked now, and not accustomed to the Flink internal code (would like to
   though, if Alibaba/Ververica are recruiting remote workers, wink wink,
   nudge nudge).

All opinions very much welcomed on all Options and Remarks!

Cheers, and a happy new year to all,
Benoît

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html#temporal-table

[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#processing-time-temporal-joins

[3] https://issues.apache.org/jira/browse/FLINK-15112

[4] https://issues.apache.org/jira/browse/FLINK-14200

[5] https://arxiv.org/pdf/1905.12133.pdf


Re: Flink ML feature

2019-12-10 Thread Benoît Paris
Is there any information as to whether Alink is going to be contributed to
Apache Flink as the official ML Lib?


On Tue, Dec 10, 2019 at 7:11 AM vino yang  wrote:

> Hi Chandu,
>
> AFAIK, there is a project named Alink[1] which is the Machine Learning
> algorithm platform based on Flink, developed by the PAI team of Alibaba
> computing platform. FYI
>
> Best,
> Vino
>
> [1]: https://github.com/alibaba/Alink
>
> Tom Blackwood  于2019年12月10日周二 下午2:07写道:
>
>> You may try Spark ML, which is a production ready library for ML stuff.
>>
>> regards.
>>
>> On Tue, Dec 10, 2019 at 1:04 PM chandu soa  wrote:
>>
>>> Hello Community,
>>>
>>> Can you please give me some pointers for implementing Machine Learning
>>> using Flink.
>>>
>>> I see Flink ML libraries were dropped in v1.9. It looks like ML feature
>>> in Flink going to be enhanced.
>>>
>>> What is the recommended approach for implementing production grade ML
>>> based apps using Flink? v1.9 is ok?or should wait for 1.10?
>>>
>>> Thanks,
>>> Chandu
>>>
>>

-- 
Benoît Paris
Ingénieur Machine Learning Explicable
Tél : +33 6 60 74 23 00
http://benoit.paris
http://explicable.ml


Re: Joining multiple temporal tables

2019-12-06 Thread Benoît Paris
t
>>> org.apache.flink.table.plan.rules.logical.LogicalCorrelateToTemporalTableJoinRule.onMatch(LogicalCorrelateToTemporalTableJoinRule.scala:91)
>>> at
>>> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
>>> at
>>> org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560)
>>> at
>>> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419)
>>> at
>>> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:284)
>>> at
>>> org.apache.calcite.plan.hep.HepInstruction$RuleCollection.execute(HepInstruction.java:74)
>>> at
>>> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215)
>>> at
>>> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202)
>>> at
>>> org.apache.flink.table.plan.Optimizer.runHepPlanner(Optimizer.scala:228)
>>> at
>>> org.apache.flink.table.plan.Optimizer.runHepPlannerSimultaneously(Optimizer.scala:212)
>>> at
>>> org.apache.flink.table.plan.Optimizer.optimizeExpandPlan(Optimizer.scala:138)
>>> at
>>> org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:61)
>>> at
>>> org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410)
>>> at
>>> org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:187)
>>> at
>>> org.apache.flink.table.planner.StreamPlanner.$anonfun$translate$1(StreamPlanner.scala:127)
>>> at
>>> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>>> at scala.collection.Iterator.foreach(Iterator.scala:937)
>>> at scala.collection.Iterator.foreach$(Iterator.scala:937)
>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>>> at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>>> at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>> at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>>> at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>>> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>>> at
>>> org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127)
>>> at
>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:319)
>>> at
>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:227)
>>> at
>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:218)
>>> at test.PojoTest.run(PojoTest.java:96)
>>> at test.PojoTest.main(PojoTest.java:23)
>>>
>>

-- 
Benoît Paris
Ingénieur Machine Learning Explicable
Tél : +33 6 60 74 23 00
http://benoit.paris
http://explicable.ml


Re: 【flink sql】table in subquery join temporal table raise java.lang.NullPointerException

2019-10-14 Thread Benoît Paris
e
> public TableSink> configure(String[]
> fieldNames, TypeInformation[] fieldTypes) {
> return this;
> }
> @Override
> public void emitDataStream(DataStream>
> dataStream) {
> consumeDataStream(dataStream);
> }
> @Override
> public TypeInformation getRecordType() {
> return Types.ROW(getFieldNames(), getFieldTypes());
> }
> @Override
> public DataStreamSink>
> consumeDataStream(DataStream> dataStream) {
> return dataStream.addSink(new SysoSinkFunction Row>>());
> }
> }
>
> @SuppressWarnings("serial")
> public static class SysoSinkFunction implements SinkFunction {
> @Override
> public void invoke(T value) throws Exception {
> System.out.println(value);
> }
> }
>
> public static class FooSource implements StreamTableSource,
> DefinedProctimeAttribute {
>
> String[] fieldNames;
>
>
> public FooSource(String[] fieldNames) {
> this.fieldNames = fieldNames;
> }
>
> @Override
> public TableSchema getTableSchema() {
> return new TableSchema(fieldNames, new TypeInformation[]
> {Types.STRING(), Types.LONG(), Types.SQL_TIMESTAMP()});
> }
>
> @Override
> public TypeInformation getReturnType() {
> return Types.ROW(fieldNames, new TypeInformation[]
> {Types.STRING(), Types.LONG(), Types.SQL_TIMESTAMP()});
> }
>
> @Override
> public DataStream getDataStream(StreamExecutionEnvironment
> execEnv) {
> return execEnv.addSource(new SourceFunction() {
>
> @Override
> public void run(SourceContext ctx) throws Exception {
> Random random = new Random();
>
> while (true) {
>
> Row row = new Row(3);
> row.setField(0, "Euro" + random.nextLong() % 3);
> row.setField(1, random.nextLong() % 200 );
> row.setField(2, new
> java.sql.Timestamp(System.currentTimeMillis()));
> ctx.collect(row);
> Thread.sleep(100);
> }
>
> }
>
> @Override
> public void cancel() {
> System.out.println("cancelling
> --");
>
> }
> }, getReturnType());
> }
>
> @Override
> public String getProctimeAttribute() {
> return fieldNames[2];
> }
> }
>
> }
>
>

-- 
Benoît Paris
Ingénieur Machine Learning Explicable
Tél : +33 6 60 74 23 00
http://benoit.paris
http://explicable.ml


Re: Random forest - Flink ML

2019-03-12 Thread Benoît Paris
There has been some developments at  Apache SAMOA
   for a forest of decision trees.

This is not regular Random Forest, but a form of trees that can be
incrementally learned fast. If I recall correctly they also have adaptive
algorithms as well. Here are some resources:

*  VHT: Vertical Hoeffding Tree   

*  Apache SAMOA   

Now I don't know the status of the project nor have I tried them, nor have I
ever tried SAMOA; but this is something that could fit your needs.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/