Re: Flink SQL upsert-kafka connector 生成的 Stage ChangelogNormalize 算子的疑问

2021-03-04 文章 Qishang
Hi Jark. 对于 upsert-kafka connector 有两个疑问: 1. upsert-kafka 没有像 kafka connector 里面设置 offset 的参数 `scan.startup.* ` ,我试了下每次都是从 earliest 开始; 2. 中间的 operator ChangelogNormalize 会放大数据量,输入一条数据,经过 ChangelogNormalize 算子之后会变成2条,这个不是很理解? Qishang 于2021年3月5日周五 上午11:14写道: > > 某些原因导致上游 kafka partition 只有一个,业务

Re: 回复:状态恢复参数顺序 -s

2021-03-04 文章 dushang
Okay ,THX -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re:Re: sql 动态修改参数问题

2021-03-04 文章 Michael Ran
我没重现这个BUG,是1.12 的版本吗? 在 2021-03-04 23:12:11,"Jark Wu" 写道: >看起来是个分段优化复用节点的bug,可以去 JIRA 开个 issue。 > >Best, >Jark > >On Thu, 4 Mar 2021 at 19:37, 酷酷的浑蛋 wrote: > >> StatementSet statementSet = tableEnvironment.createStatementSet(); >> String sql1 = "insert into test select a,b,c from test_a_123

回复:状态恢复参数顺序 -s

2021-03-04 文章 allanqinjy
你放在jar包后就当作jar的参数了 ,你可以试试这样在你的main中获取参数 s 就是你的path。 | | allanqinjy | | allanqi...@163.com | 签名由网易邮箱大师定制 在2021年03月5日 14:57,dushang<1823103...@qq.com> 写道: ../bin/flink run -s path -c class test.jar 这里面的-s 必须在最前面么,我换成 ../bin/flink run -c class test.jar -s path 不生效。 -- Sent from: http://apac

状态恢复参数顺序 -s

2021-03-04 文章 dushang
../bin/flink run -s path -c class test.jar 这里面的-s 必须在最前面么,我换成 ../bin/flink run -c class test.jar -s path 不生效。 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re:回复:消息积压如何优化

2021-03-04 文章 Michael Ran
一般a->b->c->d->e->f 算子。如果f 跟不上,会导致abcde 出现被压,所以看被压前一个洛。还有就是看了解算子,根据经验 在 2021-03-05 14:39:40,"allanqinjy" 写道: >你好, > > 消费速度跟不上写入速度,通过调试并发可以提高消费,但是不能一直这样靠着修改并发度来做优化。你说的看哪个算子问题,能具体点吗?怎么就算有问题了?一旦被压了,基本你看算子,基本的被压肯定都是正常的只有source那里才是high,被压会一级一级的反到source。你看算子的metrics也能只能看进入的量,出去的量,sin

回复:消息积压如何优化

2021-03-04 文章 allanqinjy
你好, 消费速度跟不上写入速度,通过调试并发可以提高消费,但是不能一直这样靠着修改并发度来做优化。你说的看哪个算子问题,能具体点吗?怎么就算有问题了?一旦被压了,基本你看算子,基本的被压肯定都是正常的只有source那里才是high,被压会一级一级的反到source。你看算子的metrics也能只能看进入的量,出去的量,sink的出去的量是第三方flink ui上应该也看不到,这样如何排查是哪个算子的具体问题?你们是有什么好的方法吗? | | allanqinjy | | allanqi...@163.com | 签名由网易邮箱大师定制 在2021年03月5日 14:22

Re:消息积压如何优化

2021-03-04 文章 Michael Ran
看看哪个算子问题,增加那个算子并发。 或者优化那个算子执行。 先找找原因 在 2021-03-05 11:05:14,"allanqinjy" 写道: > > >hi, >由于消息队列量特别大,在flink接入以后被压严重。除了修改并行度以外还有没有其他的优化方案!!! >| | >allanqinjy >| >| >allanqi...@163.com >| >签名由网易邮箱大师定制 >

回复: yarn.containers.vcores使用问题

2021-03-04 文章 阿华田
已经可以了 flink1.11 在jobManger的ui页面配置信息那块能看到这个参数是否配置成功 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2021年03月5日 11:44,Xintong Song 写道: 你的 flink 是什么版本? 部署模式是 per-job 还是 session? “看到任务配置参数也生效了”具体是在哪里看到的? Thank you~ Xintong Song On Thu, Mar 4, 2021 at 4:35 PM 阿华田 wrote: 使用-yD yarn.containe

Re: Flink1.12 如何使用代码提交Batch的Sql

2021-03-04 文章 shougou
感谢Qishang同学的回复! 我主要的目标是往一个运行的Flink集群上提交SQL,因为需要开窗写hive表,只能提交Batch的SQL。 如果是stream的,我是这么提交Sql,可以提交到Flink集群上: final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("tkbd01", 8081); //StreamExecutionEnvironment bsEnv = StreamExecutionEnvir

Re: yarn.containers.vcores使用问题

2021-03-04 文章 Xintong Song
你的 flink 是什么版本? 部署模式是 per-job 还是 session? “看到任务配置参数也生效了”具体是在哪里看到的? Thank you~ Xintong Song On Thu, Mar 4, 2021 at 4:35 PM 阿华田 wrote: > 使用-yD yarn.containers.vcores=4 > 区设置flink任务的总的cpu核数,看到任务配置参数也生效了 但是实际申请核数还是按照 cpu slot一比一申请的 > 各位大佬使用yarn.containers.vcores是不是还需要开启yarn的cpu 调度 > | | > 阿华田 >

退订

2021-03-04 文章 Gauler Tan

Re: Flink SQL upsert-kafka connector 生成的 Stage ChangelogNormalize 算子的疑问

2021-03-04 文章 Qishang
某些原因导致上游 kafka partition 只有一个,业务逻辑大都是关联维表或者 UDF 调用 API,这个就很NICE。。 学到了,感谢。 Jark Wu 于2021年3月4日周四 下午11:11写道: > 1. 对于 upsert-kafka 会默认加上 ChangelogNormalize > 2. ChangelogNormalize 会用 env 并发,所以可以认为能突破你说的并发限制。kafka + canal-json > 也能用,但是要加上 table.exec.source.cdc-events-duplicate = true > 参数[1]才能开启。但是要注

Re: flink sql中如何使用异步io关联维表?

2021-03-04 文章 Leonard Xu
目前Flink SQL 中的connector都没实现异步io关联维表,接口是上已经支持了的,如果是自己实现可以参考[1] 另外,HBase connector 社区有人正在支持异步io关联维表,预计1.13可以使用[2] 祝好 [1]https://github.com/apache/flink/blob/73cdd3d0d9f6a807b3e47c09eef7983c9aa180c7/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValu

消息积压如何优化

2021-03-04 文章 allanqinjy
hi, 由于消息队列量特别大,在flink接入以后被压严重。除了修改并行度以外还有没有其他的优化方案!!! | | allanqinjy | | allanqi...@163.com | 签名由网易邮箱大师定制

Re: flink sql中如何使用异步io关联维表?

2021-03-04 文章 peibin wang
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/legacy.html#temporal-table-function-join flink sql的 temporal-table join 应该都是通过异步io来关联维表的 casel.chen 于2021年3月3日周三 下午10:54写道: > flink sql中如何使用异步io关联维表?官网文档有介绍么?

消息队列量级特别如何优化消费

2021-03-04 文章 allanqinjy
hi, 由于消息队列量特别大,在flink接入以后被压严重。除了修改并行度以外还有没有其他的优化方案!!! | | allanqinjy | | allanqi...@163.com | 签名由网易邮箱大师定制

Re: Flink1.12 如何使用代码提交Batch的Sql

2021-03-04 文章 Qishang
Hi shougou. 你要找的是不是这个[1] // **// BLINK BATCH QUERY// **import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.TableEnvironment; EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build

flink+iceberg/hudi 在大数据量下的表现如何?

2021-03-04 文章 Husky Zeng
对数据量千亿级别的大表,每小时进行亿级别的upsert操作,iceberg 能撑得住吗?hudi在这方面是不是更擅长一些? 还有一点,iceberg似乎不支持sql的upsert,只支持编程式的。 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Flink1.12 如何使用代码提交Batch的Sql

2021-03-04 文章 shougou
我们知道如果在1.12里使用Table API来提交Batch的作业,比如: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.BATCH); 但是,如果提交Sql作业的话: StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); Table result = tableEnv.sqlQuery(...);

flink?????? task manager????????

2021-03-04 文章 ????
flink?? : org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager 'cdh-slave-01/130.0.0.1:41800'. This might indicate that the remote ta

退订

2021-03-04 文章 陈刚
退订

Re: sql 动态修改参数问题

2021-03-04 文章 Jark Wu
看起来是个分段优化复用节点的bug,可以去 JIRA 开个 issue。 Best, Jark On Thu, 4 Mar 2021 at 19:37, 酷酷的浑蛋 wrote: > StatementSet statementSet = tableEnvironment.createStatementSet(); > String sql1 = "insert into test select a,b,c from test_a_12342 /*+ > OPTIONS('table-name'='test_a_1')*/"; > String sql2 = "insert into

Re: Flink SQL upsert-kafka connector 生成的 Stage ChangelogNormalize 算子的疑问

2021-03-04 文章 Jark Wu
1. 对于 upsert-kafka 会默认加上 ChangelogNormalize 2. ChangelogNormalize 会用 env 并发,所以可以认为能突破你说的并发限制。kafka + canal-json 也能用,但是要加上 table.exec.source.cdc-events-duplicate = true 参数[1]才能开启。但是要注意 ChangelogNormalize 是一个 stateful 节点,本身也是有性能开销的,总体性能可能还不如 forward。 Best, Jark [1]: https://ci.apache.org/projects/f

Re: [DISCUSS] Deprecation and removal of the legacy SQL planner

2021-03-04 文章 Jark Wu
big +1 from my side. Best, Jark On Thu, 4 Mar 2021 at 20:59, Leonard Xu wrote: > +1 for the roadmap. > > Thanks Timo for driving this. > > Best, > Leonard > > > 在 2021年3月4日,20:40,Timo Walther 写道: > > > > Last call for feedback on this topic. > > > > It seems everyone agrees to finally complete

Flink??????????????????????sql????????????

2021-03-04 文章 ????
     flink??sql??sql?              example:       tEnv.registerDataStream("tableName", dataStream, "id, name, age ,time");       Table result = tEnv.sqlQuery("SQL" ); ??

Re: [DISCUSS] Deprecation and removal of the legacy SQL planner

2021-03-04 文章 Leonard Xu
+1 for the roadmap. Thanks Timo for driving this. Best, Leonard > 在 2021年3月4日,20:40,Timo Walther 写道: > > Last call for feedback on this topic. > > It seems everyone agrees to finally complete FLIP-32. Since FLIP-32 has been > accepted for a very long time, I think we don't need another votin

Re: [DISCUSS] Deprecation and removal of the legacy SQL planner

2021-03-04 文章 Timo Walther
Last call for feedback on this topic. It seems everyone agrees to finally complete FLIP-32. Since FLIP-32 has been accepted for a very long time, I think we don't need another voting thread for executing the last implementation step. Please let me know if you think differently. I will start

flink 1.12分支写入hive decimal类型jar冲突

2021-03-04 文章 kandy.wang
flink版本:1.12 hive版本:2.3.4 flink 1.12分支写入hive decimal类型报错: java.lang.NoSuchMethodError: org.apache.hadoop.hive.serde2.io.HiveDecimalWritable.serialize64(I)J at org.apache.orc.impl.ColumnStatisticsImpl$Decimal64StatisticsImpl.updateDecimal(ColumnStatisticsImpl.java:1010) at org.apache.orc.

sql 动态修改参数问题

2021-03-04 文章 酷酷的浑蛋
StatementSet statementSet = tableEnvironment.createStatementSet(); String sql1 = "insert into test select a,b,c from test_a_12342 /*+ OPTIONS('table-name'='test_a_1')*/"; String sql2 = "insert into test select a,b,c from test_a_12342 /*+ OPTIONS('table-name'='test_a_2')*/"; statementSet.addInsert

flink sql没有jar包如何恢复

2021-03-04 文章 huayuan
如题 官方的恢复是flink run -s path xxx.jar 那么flink sql没有jar包如何恢复呢 StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); Configuration configuration = bsTableEnv.getConfig().getConfiguration(); configuration.setString("execution.savepoint.path","xxx")貌似不管用 -- Sent from: h

flink sql没有jar包如何恢复?

2021-03-04 文章 huayuan
如题 官方的恢复是flink run -s path xxx.jar 那么flink sql没有jar包如何恢复呢 StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); Configuration configuration = bsTableEnv.getConfig().getConfiguration(); configuration.setString("execution.savepoint.path","xxx")貌似不管用 -- Sent from: h

远程提交flink sql设置了checkpoint,flink sql没有jar包如何恢复呢?

2021-03-04 文章 huayuan
如题 官方的恢复是flink run -s path xxx.jar 那么flink sql没有jar包如何恢复呢 StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); Configuration configuration = bsTableEnv.getConfig().getConfiguration(); configuration.setString("execution.savepoint.path","xxx")貌似不管用 -- Sent from: h

退订

2021-03-04 文章 tiantingting5...@163.com
退订 tiantingting5...@163.com

yarn.containers.vcores使用问题

2021-03-04 文章 阿华田
使用-yD yarn.containers.vcores=4 区设置flink任务的总的cpu核数,看到任务配置参数也生效了 但是实际申请核数还是按照 cpu slot一比一申请的 各位大佬使用yarn.containers.vcores是不是还需要开启yarn的cpu 调度 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制

Re: 如何动态配置 flink run 的 client日志文件的路径?

2021-03-04 文章 Yang Wang
你在运行flink run命令以前export一下FLINK_LOG_DIR应该就可以的 Best, Yang 小旋锋 于2021年3月3日周三 下午12:12写道: > Hi all. > 通过flink run提交Flink作业,flink client产生的日志文件默认是在 $FLINK_HOME/log 下。 > 需要将每个作业提交产生的日志分别放到不同的目录下,那么请问如何动态指定每次flink run的日志文件的路径呢? > > > 附: > 1. 通过设置 env.log.dir 配置项的值,在 flink-conf.yaml 文件中会生效,但通过 -yD 或 -D >