Hmmm, that's a strange behavior that is unexpected (to me).
Flink optimizes the Table API / SQL queries when a Table is converted into
a DataStream (or DataSet) or emitted to a TableSink.
So, given that you convert the result tables in addSink() into a DataStream
and write them to a sink function, they should not share any operators or
plan fragments.
Especially, because you are registering new tables for each query
(Calcite's optimizer does not even know that there is something to share).

Maybe, the problem happens at the level of the DataStream API and some
operators are chained?

Can you share the code of your addSink() function?

Thanks, Fabian



2018-03-16 15:53 GMT+01:00 Juho Autio <juho.au...@rovio.com>:

> Hi, has there been any changes to state handling with Flink SQL? Anything
> planned?
>
> I didn't find it at
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html.
>
>
> Recently I ran into problems when trying to restore the state after changes
> that I thought wouldn't change the execution plan.
>
> This is what I did basically:
>
> SingleOutputStreamOperator kafkaStream = createKafkaStream(env);
>
> registerTable("events1", kafkaStream, tableEnv);
> Table result1 = tableEnv.sql(sql1);
> addSink(result1);
>
> registerTable("events2", kafkaStream, tableEnv);
> Table result2 = tableEnv.sql(sql2);
> addSink(result2);
>
>
> Now, even though I created two different tables from kafkaStream, they end
> up in the same StreamTableEnvironment and Flink optimizes the plan
> combining
> common parts of sql1 & sql2 :) That's of course nice when stream DAG
> doesn't
> need to be evolved.. But if I'd want to force a "split" after the
> kafkaStream, is there any way to do so without having to read from kafka
> multiple times?
>
> If I now add this:
>
> registerTable("events3", kafkaStream, tableEnv);
> Table result3 = tableEnv.sql(sql3);
> addSink(result3);
>
> ..restoring from savepoint fails, because Flink pushes for example filters
> (WHERE conditions) from this new query into the first common block of the
> SQL execution plan, and adding another branch to the DAG changing the
> common
> block, too. So even though this change looked like I'm only adding
> something
> new, it also touched the existing parts of the DAG.
>
> I ended up using java API where I define operators explicitly, even setting
> uid on each operator to ensure savepoint compatibility. I would hope to be
> able to use SQL and Table API instead, if it was also possible to restore
> state after this kind of incremental changes.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Reply via email to