Re: Flink SQL upsert-kafka connector 生成的 Stage ChangelogNormalize 算子的疑问

2021-03-04 文章 Qishang
Hi Jark.
对于 upsert-kafka connector 有两个疑问:

1. upsert-kafka 没有像 kafka connector 里面设置 offset 的参数 `scan.startup.* `
,我试了下每次都是从 earliest 开始;
2. 中间的 operator ChangelogNormalize 会放大数据量,输入一条数据,经过 ChangelogNormalize
算子之后会变成2条,这个不是很理解?


Qishang  于2021年3月5日周五 上午11:14写道:

>
> 某些原因导致上游 kafka partition 只有一个,业务逻辑大都是关联维表或者 UDF 调用 API,这个就很NICE。。
> 学到了,感谢。
>
> Jark Wu  于2021年3月4日周四 下午11:11写道:
>
>> 1. 对于 upsert-kafka 会默认加上 ChangelogNormalize
>> 2. ChangelogNormalize 会用 env 并发,所以可以认为能突破你说的并发限制。kafka + canal-json
>> 也能用,但是要加上 table.exec.source.cdc-events-duplicate = true
>> 参数[1]才能开启。但是要注意 ChangelogNormalize 是一个 stateful 节点,本身也是有性能开销的,总体性能可能还不如
>> forward。
>>
>> Best,
>> Jark
>>
>> [1]:
>>
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/config/#table-exec-source-cdc-events-duplicate
>>
>> On Thu, 4 Mar 2021 at 15:30, Qishang  wrote:
>>
>> > Hi 社区。
>> > Flink 1.12.1
>> >
>> > 现在的业务是通过 canal-json 从kafka 接入数据写入DB ,但是由于 Topic 是1个 Partition
>> ,设置大的并发,对于只有
>> > forword 的ETL没有作用。
>> >
>> > insert into table_a select id,udf(a),b,c from table_b;
>> >
>> > 发现 upsert-kafka connector expain 生成的 Stage 有 ChangelogNormalize 可以分区
>> > 1. ChangelogNormalize 是会默认加上的吗,还是在哪里可以设置?
>> > 2. 这个可以改变默认 Kakfka Partition 带来的并发限制,只在 upsert-kafka
>> 中生效吗?可以用在我上面说的场景上面吗?
>> >
>> > ```
>> > == Physical Execution Plan ==
>> > Stage 1 : Data Source
>> > content : Source: TableSourceScan(table=[[default_catalog,
>> > default_database, temp_table]], fields=[id...])
>> >
>> > Stage 3 : Operator
>> > content : ChangelogNormalize(key=[id])
>> > ship_strategy : HASH
>> >
>> > Stage 4 : Operator
>> > content : Calc(select=[...])
>> > ship_strategy : FORWARD
>> >
>> > Stage 5 : Data Sink
>> > content : Sink: Sink(table=[default_catalog.default_database.table_a],
>> > fields=[id...])
>> > ship_strategy : FORWARD
>> > ```
>> >
>>
>


回复:状态恢复参数顺序 -s

2021-03-04 文章 allanqinjy
你放在jar包后就当作jar的参数了 ,你可以试试这样在你的main中获取参数 s 就是你的path。


| |
allanqinjy
|
|
allanqi...@163.com
|
签名由网易邮箱大师定制


在2021年03月5日 14:57,dushang<1823103...@qq.com> 写道:
../bin/flink run -s path  -c class test.jar 这里面的-s 必须在最前面么,我换成  ../bin/flink
run   -c class test.jar -s path 不生效。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


状态恢复参数顺序 -s

2021-03-04 文章 dushang
../bin/flink run -s path  -c class test.jar 这里面的-s 必须在最前面么,我换成  ../bin/flink
run   -c class test.jar -s path 不生效。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re:回复:消息积压如何优化

2021-03-04 文章 Michael Ran



一般a->b->c->d->e->f 算子。如果f 跟不上,会导致abcde 出现被压,所以看被压前一个洛。还有就是看了解算子,根据经验














在 2021-03-05 14:39:40,"allanqinjy"  写道:
>你好,
>
> 消费速度跟不上写入速度,通过调试并发可以提高消费,但是不能一直这样靠着修改并发度来做优化。你说的看哪个算子问题,能具体点吗?怎么就算有问题了?一旦被压了,基本你看算子,基本的被压肯定都是正常的只有source那里才是high,被压会一级一级的反到source。你看算子的metrics也能只能看进入的量,出去的量,sink的出去的量是第三方flink
>  ui上应该也看不到,这样如何排查是哪个算子的具体问题?你们是有什么好的方法吗?
>
>
>| |
>allanqinjy
>|
>|
>allanqi...@163.com
>|
>签名由网易邮箱大师定制
>
>
>在2021年03月5日 14:22,Michael Ran 写道:
>看看哪个算子问题,增加那个算子并发。 或者优化那个算子执行。 先找找原因
>在 2021-03-05 11:05:14,"allanqinjy"  写道:
>
>
>hi,
>由于消息队列量特别大,在flink接入以后被压严重。除了修改并行度以外还有没有其他的优化方案!!!
>| |
>allanqinjy
>|
>|
>allanqi...@163.com
>|
>签名由网易邮箱大师定制
>


回复:消息积压如何优化

2021-03-04 文章 allanqinjy
你好,

消费速度跟不上写入速度,通过调试并发可以提高消费,但是不能一直这样靠着修改并发度来做优化。你说的看哪个算子问题,能具体点吗?怎么就算有问题了?一旦被压了,基本你看算子,基本的被压肯定都是正常的只有source那里才是high,被压会一级一级的反到source。你看算子的metrics也能只能看进入的量,出去的量,sink的出去的量是第三方flink
 ui上应该也看不到,这样如何排查是哪个算子的具体问题?你们是有什么好的方法吗?


| |
allanqinjy
|
|
allanqi...@163.com
|
签名由网易邮箱大师定制


在2021年03月5日 14:22,Michael Ran 写道:
看看哪个算子问题,增加那个算子并发。 或者优化那个算子执行。 先找找原因
在 2021-03-05 11:05:14,"allanqinjy"  写道:


hi,
由于消息队列量特别大,在flink接入以后被压严重。除了修改并行度以外还有没有其他的优化方案!!!
| |
allanqinjy
|
|
allanqi...@163.com
|
签名由网易邮箱大师定制



Re:消息积压如何优化

2021-03-04 文章 Michael Ran
看看哪个算子问题,增加那个算子并发。 或者优化那个算子执行。 先找找原因
在 2021-03-05 11:05:14,"allanqinjy"  写道:
>
>
>hi,
>由于消息队列量特别大,在flink接入以后被压严重。除了修改并行度以外还有没有其他的优化方案!!!
>| |
>allanqinjy
>|
>|
>allanqi...@163.com
>|
>签名由网易邮箱大师定制
>


回复: yarn.containers.vcores使用问题

2021-03-04 文章 阿华田
已经可以了  flink1.11   在jobManger的ui页面配置信息那块能看到这个参数是否配置成功


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2021年03月5日 11:44,Xintong Song 写道:
你的 flink 是什么版本?
部署模式是 per-job 还是 session?
“看到任务配置参数也生效了”具体是在哪里看到的?

Thank you~

Xintong Song



On Thu, Mar 4, 2021 at 4:35 PM 阿华田  wrote:

使用-yD yarn.containers.vcores=4
区设置flink任务的总的cpu核数,看到任务配置参数也生效了 但是实际申请核数还是按照 cpu  slot一比一申请的
各位大佬使用yarn.containers.vcores是不是还需要开启yarn的cpu 调度
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制




Re: Flink1.12 如何使用代码提交Batch的Sql

2021-03-04 文章 shougou
感谢Qishang同学的回复!

我主要的目标是往一个运行的Flink集群上提交SQL,因为需要开窗写hive表,只能提交Batch的SQL。

如果是stream的,我是这么提交Sql,可以提交到Flink集群上:
final StreamExecutionEnvironment env = 

StreamExecutionEnvironment.createRemoteEnvironment("tkbd01", 8081);
//StreamExecutionEnvironment bsEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = 
   
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env,
bsSettings);
//TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);

但是这个是*StreamExecutionEnvironment 没有办法执行BatchMode*
矛盾的地方在于 如果使用  TableEnvironment bsTableEnv =
TableEnvironment.create(bsSettings);  来执行Batch sql,又没有办法*通过final
StreamExecutionEnvironment env   来创建TableEnvironment*。





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: yarn.containers.vcores使用问题

2021-03-04 文章 Xintong Song
你的 flink 是什么版本?
部署模式是 per-job 还是 session?
“看到任务配置参数也生效了”具体是在哪里看到的?

Thank you~

Xintong Song



On Thu, Mar 4, 2021 at 4:35 PM 阿华田  wrote:

> 使用-yD yarn.containers.vcores=4
> 区设置flink任务的总的cpu核数,看到任务配置参数也生效了 但是实际申请核数还是按照 cpu  slot一比一申请的
> 各位大佬使用yarn.containers.vcores是不是还需要开启yarn的cpu 调度
> | |
> 阿华田
> |
> |
> a15733178...@163.com
> |
> 签名由网易邮箱大师定制
>
>


退订

2021-03-04 文章 Gauler Tan



Re: Flink SQL upsert-kafka connector 生成的 Stage ChangelogNormalize 算子的疑问

2021-03-04 文章 Qishang
某些原因导致上游 kafka partition 只有一个,业务逻辑大都是关联维表或者 UDF 调用 API,这个就很NICE。。
学到了,感谢。

Jark Wu  于2021年3月4日周四 下午11:11写道:

> 1. 对于 upsert-kafka 会默认加上 ChangelogNormalize
> 2. ChangelogNormalize 会用 env 并发,所以可以认为能突破你说的并发限制。kafka + canal-json
> 也能用,但是要加上 table.exec.source.cdc-events-duplicate = true
> 参数[1]才能开启。但是要注意 ChangelogNormalize 是一个 stateful 节点,本身也是有性能开销的,总体性能可能还不如
> forward。
>
> Best,
> Jark
>
> [1]:
>
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/config/#table-exec-source-cdc-events-duplicate
>
> On Thu, 4 Mar 2021 at 15:30, Qishang  wrote:
>
> > Hi 社区。
> > Flink 1.12.1
> >
> > 现在的业务是通过 canal-json 从kafka 接入数据写入DB ,但是由于 Topic 是1个 Partition
> ,设置大的并发,对于只有
> > forword 的ETL没有作用。
> >
> > insert into table_a select id,udf(a),b,c from table_b;
> >
> > 发现 upsert-kafka connector expain 生成的 Stage 有 ChangelogNormalize 可以分区
> > 1. ChangelogNormalize 是会默认加上的吗,还是在哪里可以设置?
> > 2. 这个可以改变默认 Kakfka Partition 带来的并发限制,只在 upsert-kafka 中生效吗?可以用在我上面说的场景上面吗?
> >
> > ```
> > == Physical Execution Plan ==
> > Stage 1 : Data Source
> > content : Source: TableSourceScan(table=[[default_catalog,
> > default_database, temp_table]], fields=[id...])
> >
> > Stage 3 : Operator
> > content : ChangelogNormalize(key=[id])
> > ship_strategy : HASH
> >
> > Stage 4 : Operator
> > content : Calc(select=[...])
> > ship_strategy : FORWARD
> >
> > Stage 5 : Data Sink
> > content : Sink: Sink(table=[default_catalog.default_database.table_a],
> > fields=[id...])
> > ship_strategy : FORWARD
> > ```
> >
>


Re: flink sql中如何使用异步io关联维表?

2021-03-04 文章 Leonard Xu
目前Flink SQL 中的connector都没实现异步io关联维表,接口是上已经支持了的,如果是自己实现可以参考[1]
另外,HBase connector 社区有人正在支持异步io关联维表,预计1.13可以使用[2]

祝好

[1]https://github.com/apache/flink/blob/73cdd3d0d9f6a807b3e47c09eef7983c9aa180c7/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java#L618
[2] https://github.com/apache/flink/pull/14684#pullrequestreview-604148209 




> 在 2021年3月4日,14:22,HunterXHunter <1356469...@qq.com> 写道:
> 
> 定义一个 sourcetable
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



消息积压如何优化

2021-03-04 文章 allanqinjy


hi,
由于消息队列量特别大,在flink接入以后被压严重。除了修改并行度以外还有没有其他的优化方案!!!
| |
allanqinjy
|
|
allanqi...@163.com
|
签名由网易邮箱大师定制



Re: flink sql中如何使用异步io关联维表?

2021-03-04 文章 peibin wang
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/legacy.html#temporal-table-function-join
flink sql的 temporal-table join 应该都是通过异步io来关联维表的

casel.chen  于2021年3月3日周三 下午10:54写道:

> flink sql中如何使用异步io关联维表?官网文档有介绍么?


消息队列量级特别如何优化消费

2021-03-04 文章 allanqinjy


hi,
  由于消息队列量特别大,在flink接入以后被压严重。除了修改并行度以外还有没有其他的优化方案!!!


| |
allanqinjy
|
|
allanqi...@163.com
|
签名由网易邮箱大师定制



Re: Flink1.12 如何使用代码提交Batch的Sql

2021-03-04 文章 Qishang
Hi shougou.

你要找的是不是这个[1]

// **// BLINK BATCH QUERY// **import
org.apache.flink.table.api.EnvironmentSettings;import
org.apache.flink.table.api.TableEnvironment;
EnvironmentSettings bbSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();TableEnvironment
bbTableEnv = TableEnvironment.create(bbSettings);

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html#create-a-tableenvironment


shougou <80562...@qq.com> 于2021年3月5日周五 上午9:59写道:

> 我们知道如果在1.12里使用Table API来提交Batch的作业,比如:
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setRuntimeMode(RuntimeExecutionMode.BATCH);
>
> 但是,如果提交Sql作业的话:
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
> Table result = tableEnv.sqlQuery(...);
> 文档里也找不到如何使用StreamTableEnvironment 来跑Batch的SQL,又或者使用BatchTableEnvironment?
>
> 感谢各位提供思路!
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


flink+iceberg/hudi 在大数据量下的表现如何?

2021-03-04 文章 Husky Zeng
对数据量千亿级别的大表,每小时进行亿级别的upsert操作,iceberg 能撑得住吗?hudi在这方面是不是更擅长一些?

还有一点,iceberg似乎不支持sql的upsert,只支持编程式的。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Flink1.12 如何使用代码提交Batch的Sql

2021-03-04 文章 shougou
我们知道如果在1.12里使用Table API来提交Batch的作业,比如:
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

但是,如果提交Sql作业的话:
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Table result = tableEnv.sqlQuery(...);
文档里也找不到如何使用StreamTableEnvironment 来跑Batch的SQL,又或者使用BatchTableEnvironment?

感谢各位提供思路!





--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink?????? task manager????????

2021-03-04 文章 ????
flink??




:

org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
Connection unexpectedly closed by remote task manager 
'cdh-slave-01/130.0.0.1:41800'. This might indicate that the remote task 
manager was lost.

退订

2021-03-04 文章 陈刚
退订

Re: sql 动态修改参数问题

2021-03-04 文章 Jark Wu
看起来是个分段优化复用节点的bug,可以去 JIRA 开个 issue。

Best,
Jark

On Thu, 4 Mar 2021 at 19:37, 酷酷的浑蛋  wrote:

> StatementSet statementSet = tableEnvironment.createStatementSet();
> String sql1 = "insert into test select a,b,c from test_a_12342 /*+
> OPTIONS('table-name'='test_a_1')*/";
> String sql2 = "insert into test select a,b,c from test_a_12342 /*+
> OPTIONS('table-name'='test_a_2')*/";
> statementSet.addInsertSql(sql1);
> statementSet.addInsertSql(sql2);
> statementSet.execute();
>
>
> Sql代码如上,在最终insert后是将test_a_1表的数据插入了两遍,而test_a_2的数据并没有插入,请问这个是bug吗


Re: Flink SQL upsert-kafka connector 生成的 Stage ChangelogNormalize 算子的疑问

2021-03-04 文章 Jark Wu
1. 对于 upsert-kafka 会默认加上 ChangelogNormalize
2. ChangelogNormalize 会用 env 并发,所以可以认为能突破你说的并发限制。kafka + canal-json
也能用,但是要加上 table.exec.source.cdc-events-duplicate = true
参数[1]才能开启。但是要注意 ChangelogNormalize 是一个 stateful 节点,本身也是有性能开销的,总体性能可能还不如
forward。

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/config/#table-exec-source-cdc-events-duplicate

On Thu, 4 Mar 2021 at 15:30, Qishang  wrote:

> Hi 社区。
> Flink 1.12.1
>
> 现在的业务是通过 canal-json 从kafka 接入数据写入DB ,但是由于 Topic 是1个 Partition ,设置大的并发,对于只有
> forword 的ETL没有作用。
>
> insert into table_a select id,udf(a),b,c from table_b;
>
> 发现 upsert-kafka connector expain 生成的 Stage 有 ChangelogNormalize 可以分区
> 1. ChangelogNormalize 是会默认加上的吗,还是在哪里可以设置?
> 2. 这个可以改变默认 Kakfka Partition 带来的并发限制,只在 upsert-kafka 中生效吗?可以用在我上面说的场景上面吗?
>
> ```
> == Physical Execution Plan ==
> Stage 1 : Data Source
> content : Source: TableSourceScan(table=[[default_catalog,
> default_database, temp_table]], fields=[id...])
>
> Stage 3 : Operator
> content : ChangelogNormalize(key=[id])
> ship_strategy : HASH
>
> Stage 4 : Operator
> content : Calc(select=[...])
> ship_strategy : FORWARD
>
> Stage 5 : Data Sink
> content : Sink: Sink(table=[default_catalog.default_database.table_a],
> fields=[id...])
> ship_strategy : FORWARD
> ```
>


Re: [DISCUSS] Deprecation and removal of the legacy SQL planner

2021-03-04 文章 Jark Wu
big +1 from my side.

Best,
Jark

On Thu, 4 Mar 2021 at 20:59, Leonard Xu  wrote:

> +1 for the roadmap.
>
> Thanks Timo for driving this.
>
> Best,
> Leonard
>
> > 在 2021年3月4日,20:40,Timo Walther  写道:
> >
> > Last call for feedback on this topic.
> >
> > It seems everyone agrees to finally complete FLIP-32. Since FLIP-32 has
> been accepted for a very long time, I think we don't need another voting
> thread for executing the last implementation step. Please let me know if
> you think differently.
> >
> > I will start deprecating the affected classes and interfaces beginning
> of next week.
> >
> > Regards,
> > Timo
> >
> >
> > On 26.02.21 15:46, Seth Wiesman wrote:
> >> Strong +1
> >> Having two planners is confusing to users and the diverging semantics
> make
> >> it difficult to provide useful learning material. It is time to rip the
> >> bandage off.
> >> Seth
> >> On Fri, Feb 26, 2021 at 12:54 AM Kurt Young  wrote:
> >>>  breaking
> >>> change.>
> >>>
> >>> Hi Timo,
> >>>
> >>> First of all I want to thank you for introducing this planner design
> back
> >>> in 1.9, this is a great work
> >>> that allows lots of blink features to be merged to Flink in a
> reasonably
> >>> short time. It greatly
> >>> accelerates the evolution speed of Table & SQL.
> >>>
> >>> Everything comes with a cost, as you said, right now we are facing the
> >>> overhead of maintaining
> >>> two planners and it causes bugs and also increases imbalance between
> these
> >>> two planners. As
> >>> a developer and also for the good of all Table & SQL users, I also
> think
> >>> it's better for us to be more
> >>> focused on a single planner.
> >>>
> >>> Your proposed roadmap looks good to me, +1 from my side and thanks
> >>> again for all your efforts!
> >>>
> >>> Best,
> >>> Kurt
> >>>
> >>>
> >>> On Thu, Feb 25, 2021 at 5:01 PM Timo Walther 
> wrote:
> >>>
>  Hi everyone,
> 
>  since Flink 1.9 we have supported two SQL planners. Most of the
> original
>  plan of FLIP-32 [1] has been implemented. The Blink code merge has
> been
>  completed and many additional features have been added exclusively to
>  the new planner. The new planner is now in a much better shape than
> the
>  legacy one.
> 
>  In order to avoid user confusion, reduce duplicate code, and improve
>  maintainability and testing times of the Flink project as a whole we
>  would like to propose the following steps to complete FLIP-32:
> 
>  In Flink 1.13:
>  - Deprecate the `flink-table-planner` module
>  - Deprecate `BatchTableEnvironment` for both Java, Scala, and Python
> 
>  In Flink 1.14:
>  - Drop `flink-table-planner` early
>  - Drop many deprecated interfaces and API on demand
>  - Rename `flink-table-planner-blink` to `flink-table-planner`
>  - Rename `flink-table-runtime-blink` to `flink-table-runtime`
>  - Remove references of "Blink" in the code base
> 
>  This will have an impact on users that still use DataSet API together
>  with Table API. With this change we will not support converting
> between
>  DataSet API and Table API anymore. We hope to compensate the missing
>  functionality in the new unified TableEnvironment and/or the batch
> mode
>  in DataStream API during 1.14 and 1.15. For this, we are looking for
>  further feedback which features are required in Table API/DataStream
> API
>  to have a smooth migration path.
> 
>  Looking forward to your feedback.
> 
>  Regards,
>  Timo
> 
>  [1]
> 
> 
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-32%3A+Restructure+flink-table+for+future+contributions
> 
> >>>
> >
>
>


Flink??????????????????????sql????????????

2021-03-04 文章 ????
  
flink??sql??sql?
  
   example:
  tEnv.registerDataStream("tableName", dataStream, "id, 
name, age ,time");
  Table result = tEnv.sqlQuery("SQL" ); 
  
??SQL??

Re: [DISCUSS] Deprecation and removal of the legacy SQL planner

2021-03-04 文章 Leonard Xu
+1 for the roadmap.

Thanks Timo for driving this.

Best,
Leonard

> 在 2021年3月4日,20:40,Timo Walther  写道:
> 
> Last call for feedback on this topic.
> 
> It seems everyone agrees to finally complete FLIP-32. Since FLIP-32 has been 
> accepted for a very long time, I think we don't need another voting thread 
> for executing the last implementation step. Please let me know if you think 
> differently.
> 
> I will start deprecating the affected classes and interfaces beginning of 
> next week.
> 
> Regards,
> Timo
> 
> 
> On 26.02.21 15:46, Seth Wiesman wrote:
>> Strong +1
>> Having two planners is confusing to users and the diverging semantics make
>> it difficult to provide useful learning material. It is time to rip the
>> bandage off.
>> Seth
>> On Fri, Feb 26, 2021 at 12:54 AM Kurt Young  wrote:
>>> >> change.>
>>> 
>>> Hi Timo,
>>> 
>>> First of all I want to thank you for introducing this planner design back
>>> in 1.9, this is a great work
>>> that allows lots of blink features to be merged to Flink in a reasonably
>>> short time. It greatly
>>> accelerates the evolution speed of Table & SQL.
>>> 
>>> Everything comes with a cost, as you said, right now we are facing the
>>> overhead of maintaining
>>> two planners and it causes bugs and also increases imbalance between these
>>> two planners. As
>>> a developer and also for the good of all Table & SQL users, I also think
>>> it's better for us to be more
>>> focused on a single planner.
>>> 
>>> Your proposed roadmap looks good to me, +1 from my side and thanks
>>> again for all your efforts!
>>> 
>>> Best,
>>> Kurt
>>> 
>>> 
>>> On Thu, Feb 25, 2021 at 5:01 PM Timo Walther  wrote:
>>> 
 Hi everyone,
 
 since Flink 1.9 we have supported two SQL planners. Most of the original
 plan of FLIP-32 [1] has been implemented. The Blink code merge has been
 completed and many additional features have been added exclusively to
 the new planner. The new planner is now in a much better shape than the
 legacy one.
 
 In order to avoid user confusion, reduce duplicate code, and improve
 maintainability and testing times of the Flink project as a whole we
 would like to propose the following steps to complete FLIP-32:
 
 In Flink 1.13:
 - Deprecate the `flink-table-planner` module
 - Deprecate `BatchTableEnvironment` for both Java, Scala, and Python
 
 In Flink 1.14:
 - Drop `flink-table-planner` early
 - Drop many deprecated interfaces and API on demand
 - Rename `flink-table-planner-blink` to `flink-table-planner`
 - Rename `flink-table-runtime-blink` to `flink-table-runtime`
 - Remove references of "Blink" in the code base
 
 This will have an impact on users that still use DataSet API together
 with Table API. With this change we will not support converting between
 DataSet API and Table API anymore. We hope to compensate the missing
 functionality in the new unified TableEnvironment and/or the batch mode
 in DataStream API during 1.14 and 1.15. For this, we are looking for
 further feedback which features are required in Table API/DataStream API
 to have a smooth migration path.
 
 Looking forward to your feedback.
 
 Regards,
 Timo
 
 [1]
 
 
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-32%3A+Restructure+flink-table+for+future+contributions
 
>>> 
> 



Re: [DISCUSS] Deprecation and removal of the legacy SQL planner

2021-03-04 文章 Timo Walther

Last call for feedback on this topic.

It seems everyone agrees to finally complete FLIP-32. Since FLIP-32 has 
been accepted for a very long time, I think we don't need another voting 
thread for executing the last implementation step. Please let me know if 
you think differently.


I will start deprecating the affected classes and interfaces beginning 
of next week.


Regards,
Timo


On 26.02.21 15:46, Seth Wiesman wrote:

Strong +1

Having two planners is confusing to users and the diverging semantics make
it difficult to provide useful learning material. It is time to rip the
bandage off.

Seth

On Fri, Feb 26, 2021 at 12:54 AM Kurt Young  wrote:




Hi Timo,

First of all I want to thank you for introducing this planner design back
in 1.9, this is a great work
that allows lots of blink features to be merged to Flink in a reasonably
short time. It greatly
accelerates the evolution speed of Table & SQL.

Everything comes with a cost, as you said, right now we are facing the
overhead of maintaining
two planners and it causes bugs and also increases imbalance between these
two planners. As
a developer and also for the good of all Table & SQL users, I also think
it's better for us to be more
focused on a single planner.

Your proposed roadmap looks good to me, +1 from my side and thanks
again for all your efforts!

Best,
Kurt


On Thu, Feb 25, 2021 at 5:01 PM Timo Walther  wrote:


Hi everyone,

since Flink 1.9 we have supported two SQL planners. Most of the original
plan of FLIP-32 [1] has been implemented. The Blink code merge has been
completed and many additional features have been added exclusively to
the new planner. The new planner is now in a much better shape than the
legacy one.

In order to avoid user confusion, reduce duplicate code, and improve
maintainability and testing times of the Flink project as a whole we
would like to propose the following steps to complete FLIP-32:

In Flink 1.13:
- Deprecate the `flink-table-planner` module
- Deprecate `BatchTableEnvironment` for both Java, Scala, and Python

In Flink 1.14:
- Drop `flink-table-planner` early
- Drop many deprecated interfaces and API on demand
- Rename `flink-table-planner-blink` to `flink-table-planner`
- Rename `flink-table-runtime-blink` to `flink-table-runtime`
- Remove references of "Blink" in the code base

This will have an impact on users that still use DataSet API together
with Table API. With this change we will not support converting between
DataSet API and Table API anymore. We hope to compensate the missing
functionality in the new unified TableEnvironment and/or the batch mode
in DataStream API during 1.14 and 1.15. For this, we are looking for
further feedback which features are required in Table API/DataStream API
to have a smooth migration path.

Looking forward to your feedback.

Regards,
Timo

[1]



https://cwiki.apache.org/confluence/display/FLINK/FLIP-32%3A+Restructure+flink-table+for+future+contributions










flink 1.12分支写入hive decimal类型jar冲突

2021-03-04 文章 kandy.wang
flink版本:1.12
hive版本:2.3.4
flink 1.12分支写入hive decimal类型报错:
java.lang.NoSuchMethodError: 
org.apache.hadoop.hive.serde2.io.HiveDecimalWritable.serialize64(I)J
at 
org.apache.orc.impl.ColumnStatisticsImpl$Decimal64StatisticsImpl.updateDecimal(ColumnStatisticsImpl.java:1010)
at 
org.apache.orc.impl.writer.DecimalTreeWriter.writeBatch(DecimalTreeWriter.java:99)
at 
org.apache.orc.impl.writer.DecimalTreeWriter.writeBatch(DecimalTreeWriter.java:159)
at 
org.apache.orc.impl.writer.StructTreeWriter.writeRootBatch(StructTreeWriter.java:56)
at org.apache.orc.impl.WriterImpl.addRowBatch(WriterImpl.java:557)
at 
org.apache.flink.orc.writer.OrcBulkWriter.addElement(OrcBulkWriter.java:58)
at 
org.apache.flink.table.filesystem.FileSystemTableSink$ProjectionBulkFactory$1.addElement(FileSystemTableSink.java:589)
at 
org.apache.flink.table.filesystem.FileSystemTableSink$ProjectionBulkFactory$1.addElement(FileSystemTableSink.java:585)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.write(BulkPartWriter.java:48)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:209)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:290)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
at 
org.apache.flink.table.filesystem.stream.AbstractStreamingWriter.processElement(AbstractStreamingWriter.java:140)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at 
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at StreamExecCalc$154.processElement(Unknown Source)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:137)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:761)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:215)

这个方法是在hive-exec版本3.1.2版本才有

请问:hive版本 orc版本之间有什么兼容性问题?如何解




sql 动态修改参数问题

2021-03-04 文章 酷酷的浑蛋
StatementSet statementSet = tableEnvironment.createStatementSet();
String sql1 = "insert into test select a,b,c from test_a_12342 /*+ 
OPTIONS('table-name'='test_a_1')*/";
String sql2 = "insert into test select a,b,c from test_a_12342 /*+ 
OPTIONS('table-name'='test_a_2')*/";
statementSet.addInsertSql(sql1);
statementSet.addInsertSql(sql2);
statementSet.execute();


Sql代码如上,在最终insert后是将test_a_1表的数据插入了两遍,而test_a_2的数据并没有插入,请问这个是bug吗

flink sql没有jar包如何恢复

2021-03-04 文章 huayuan
如题
官方的恢复是flink run -s path xxx.jar
那么flink sql没有jar包如何恢复呢

StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv,
bsSettings);
Configuration configuration = bsTableEnv.getConfig().getConfiguration();
configuration.setString("execution.savepoint.path","xxx")貌似不管用



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink sql没有jar包如何恢复?

2021-03-04 文章 huayuan
如题
官方的恢复是flink run -s path xxx.jar
那么flink sql没有jar包如何恢复呢

StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv,
bsSettings);
Configuration configuration = bsTableEnv.getConfig().getConfiguration();
configuration.setString("execution.savepoint.path","xxx")貌似不管用



--
Sent from: http://apache-flink.147419.n8.nabble.com/


远程提交flink sql设置了checkpoint,flink sql没有jar包如何恢复呢?

2021-03-04 文章 huayuan
如题
官方的恢复是flink run -s path xxx.jar
那么flink sql没有jar包如何恢复呢

StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv,
bsSettings);
Configuration configuration = bsTableEnv.getConfig().getConfiguration();
configuration.setString("execution.savepoint.path","xxx")貌似不管用



--
Sent from: http://apache-flink.147419.n8.nabble.com/


退订

2021-03-04 文章 tiantingting5...@163.com
退订



tiantingting5...@163.com


yarn.containers.vcores使用问题

2021-03-04 文章 阿华田
使用-yD yarn.containers.vcores=4 
区设置flink任务的总的cpu核数,看到任务配置参数也生效了 但是实际申请核数还是按照 cpu  slot一比一申请的
各位大佬使用yarn.containers.vcores是不是还需要开启yarn的cpu 调度
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



Re: 如何动态配置 flink run 的 client日志文件的路径?

2021-03-04 文章 Yang Wang
你在运行flink run命令以前export一下FLINK_LOG_DIR应该就可以的

Best,
Yang

小旋锋  于2021年3月3日周三 下午12:12写道:

> Hi all.
> 通过flink run提交Flink作业,flink client产生的日志文件默认是在 $FLINK_HOME/log 下。
> 需要将每个作业提交产生的日志分别放到不同的目录下,那么请问如何动态指定每次flink run的日志文件的路径呢?
>
>
> 附:
> 1. 通过设置env.log.dir 配置项的值,在 flink-conf.yaml文件中会生效,但通过 -yD 或 -D
> 的方式动态指定的话,it doesn't seem to work
> 2. flink version: 1.10
>
>
> Thanks.