Hi Jark.
对于 upsert-kafka connector 有两个疑问:
1. upsert-kafka 没有像 kafka connector 里面设置 offset 的参数 `scan.startup.* `
,我试了下每次都是从 earliest 开始;
2. 中间的 operator ChangelogNormalize 会放大数据量,输入一条数据,经过 ChangelogNormalize
算子之后会变成2条,这个不是很理解?
Qishang 于2021年3月5日周五 上午11:14写道:
>
> 某些原因导致上游 kafka partition 只有一个,业务
Okay ,THX
--
Sent from: http://apache-flink.147419.n8.nabble.com/
我没重现这个BUG,是1.12 的版本吗?
在 2021-03-04 23:12:11,"Jark Wu" 写道:
>看起来是个分段优化复用节点的bug,可以去 JIRA 开个 issue。
>
>Best,
>Jark
>
>On Thu, 4 Mar 2021 at 19:37, 酷酷的浑蛋 wrote:
>
>> StatementSet statementSet = tableEnvironment.createStatementSet();
>> String sql1 = "insert into test select a,b,c from test_a_123
你放在jar包后就当作jar的参数了 ,你可以试试这样在你的main中获取参数 s 就是你的path。
| |
allanqinjy
|
|
allanqi...@163.com
|
签名由网易邮箱大师定制
在2021年03月5日 14:57,dushang<1823103...@qq.com> 写道:
../bin/flink run -s path -c class test.jar 这里面的-s 必须在最前面么,我换成 ../bin/flink
run -c class test.jar -s path 不生效。
--
Sent from: http://apac
../bin/flink run -s path -c class test.jar 这里面的-s 必须在最前面么,我换成 ../bin/flink
run -c class test.jar -s path 不生效。
--
Sent from: http://apache-flink.147419.n8.nabble.com/
一般a->b->c->d->e->f 算子。如果f 跟不上,会导致abcde 出现被压,所以看被压前一个洛。还有就是看了解算子,根据经验
在 2021-03-05 14:39:40,"allanqinjy" 写道:
>你好,
>
> 消费速度跟不上写入速度,通过调试并发可以提高消费,但是不能一直这样靠着修改并发度来做优化。你说的看哪个算子问题,能具体点吗?怎么就算有问题了?一旦被压了,基本你看算子,基本的被压肯定都是正常的只有source那里才是high,被压会一级一级的反到source。你看算子的metrics也能只能看进入的量,出去的量,sin
你好,
消费速度跟不上写入速度,通过调试并发可以提高消费,但是不能一直这样靠着修改并发度来做优化。你说的看哪个算子问题,能具体点吗?怎么就算有问题了?一旦被压了,基本你看算子,基本的被压肯定都是正常的只有source那里才是high,被压会一级一级的反到source。你看算子的metrics也能只能看进入的量,出去的量,sink的出去的量是第三方flink
ui上应该也看不到,这样如何排查是哪个算子的具体问题?你们是有什么好的方法吗?
| |
allanqinjy
|
|
allanqi...@163.com
|
签名由网易邮箱大师定制
在2021年03月5日 14:22
看看哪个算子问题,增加那个算子并发。 或者优化那个算子执行。 先找找原因
在 2021-03-05 11:05:14,"allanqinjy" 写道:
>
>
>hi,
>由于消息队列量特别大,在flink接入以后被压严重。除了修改并行度以外还有没有其他的优化方案!!!
>| |
>allanqinjy
>|
>|
>allanqi...@163.com
>|
>签名由网易邮箱大师定制
>
已经可以了 flink1.11 在jobManger的ui页面配置信息那块能看到这个参数是否配置成功
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
在2021年03月5日 11:44,Xintong Song 写道:
你的 flink 是什么版本?
部署模式是 per-job 还是 session?
“看到任务配置参数也生效了”具体是在哪里看到的?
Thank you~
Xintong Song
On Thu, Mar 4, 2021 at 4:35 PM 阿华田 wrote:
使用-yD yarn.containe
感谢Qishang同学的回复!
我主要的目标是往一个运行的Flink集群上提交SQL,因为需要开窗写hive表,只能提交Batch的SQL。
如果是stream的,我是这么提交Sql,可以提交到Flink集群上:
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.createRemoteEnvironment("tkbd01", 8081);
//StreamExecutionEnvironment bsEnv =
StreamExecutionEnvir
你的 flink 是什么版本?
部署模式是 per-job 还是 session?
“看到任务配置参数也生效了”具体是在哪里看到的?
Thank you~
Xintong Song
On Thu, Mar 4, 2021 at 4:35 PM 阿华田 wrote:
> 使用-yD yarn.containers.vcores=4
> 区设置flink任务的总的cpu核数,看到任务配置参数也生效了 但是实际申请核数还是按照 cpu slot一比一申请的
> 各位大佬使用yarn.containers.vcores是不是还需要开启yarn的cpu 调度
> | |
> 阿华田
>
某些原因导致上游 kafka partition 只有一个,业务逻辑大都是关联维表或者 UDF 调用 API,这个就很NICE。。
学到了,感谢。
Jark Wu 于2021年3月4日周四 下午11:11写道:
> 1. 对于 upsert-kafka 会默认加上 ChangelogNormalize
> 2. ChangelogNormalize 会用 env 并发,所以可以认为能突破你说的并发限制。kafka + canal-json
> 也能用,但是要加上 table.exec.source.cdc-events-duplicate = true
> 参数[1]才能开启。但是要注
目前Flink SQL 中的connector都没实现异步io关联维表,接口是上已经支持了的,如果是自己实现可以参考[1]
另外,HBase connector 社区有人正在支持异步io关联维表,预计1.13可以使用[2]
祝好
[1]https://github.com/apache/flink/blob/73cdd3d0d9f6a807b3e47c09eef7983c9aa180c7/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValu
hi,
由于消息队列量特别大,在flink接入以后被压严重。除了修改并行度以外还有没有其他的优化方案!!!
| |
allanqinjy
|
|
allanqi...@163.com
|
签名由网易邮箱大师定制
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/legacy.html#temporal-table-function-join
flink sql的 temporal-table join 应该都是通过异步io来关联维表的
casel.chen 于2021年3月3日周三 下午10:54写道:
> flink sql中如何使用异步io关联维表?官网文档有介绍么?
hi,
由于消息队列量特别大,在flink接入以后被压严重。除了修改并行度以外还有没有其他的优化方案!!!
| |
allanqinjy
|
|
allanqi...@163.com
|
签名由网易邮箱大师定制
Hi shougou.
你要找的是不是这个[1]
// **// BLINK BATCH QUERY// **import
org.apache.flink.table.api.EnvironmentSettings;import
org.apache.flink.table.api.TableEnvironment;
EnvironmentSettings bbSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build
对数据量千亿级别的大表,每小时进行亿级别的upsert操作,iceberg 能撑得住吗?hudi在这方面是不是更擅长一些?
还有一点,iceberg似乎不支持sql的upsert,只支持编程式的。
--
Sent from: http://apache-flink.147419.n8.nabble.com/
我们知道如果在1.12里使用Table API来提交Batch的作业,比如:
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
但是,如果提交Sql作业的话:
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Table result = tableEnv.sqlQuery(...);
flink??
:
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
Connection unexpectedly closed by remote task manager
'cdh-slave-01/130.0.0.1:41800'. This might indicate that the remote ta
退订
看起来是个分段优化复用节点的bug,可以去 JIRA 开个 issue。
Best,
Jark
On Thu, 4 Mar 2021 at 19:37, 酷酷的浑蛋 wrote:
> StatementSet statementSet = tableEnvironment.createStatementSet();
> String sql1 = "insert into test select a,b,c from test_a_12342 /*+
> OPTIONS('table-name'='test_a_1')*/";
> String sql2 = "insert into
1. 对于 upsert-kafka 会默认加上 ChangelogNormalize
2. ChangelogNormalize 会用 env 并发,所以可以认为能突破你说的并发限制。kafka + canal-json
也能用,但是要加上 table.exec.source.cdc-events-duplicate = true
参数[1]才能开启。但是要注意 ChangelogNormalize 是一个 stateful 节点,本身也是有性能开销的,总体性能可能还不如
forward。
Best,
Jark
[1]:
https://ci.apache.org/projects/f
big +1 from my side.
Best,
Jark
On Thu, 4 Mar 2021 at 20:59, Leonard Xu wrote:
> +1 for the roadmap.
>
> Thanks Timo for driving this.
>
> Best,
> Leonard
>
> > 在 2021年3月4日,20:40,Timo Walther 写道:
> >
> > Last call for feedback on this topic.
> >
> > It seems everyone agrees to finally complete
flink??sql??sql?
example:
tEnv.registerDataStream("tableName", dataStream, "id,
name, age ,time");
Table result = tEnv.sqlQuery("SQL" );
??
+1 for the roadmap.
Thanks Timo for driving this.
Best,
Leonard
> 在 2021年3月4日,20:40,Timo Walther 写道:
>
> Last call for feedback on this topic.
>
> It seems everyone agrees to finally complete FLIP-32. Since FLIP-32 has been
> accepted for a very long time, I think we don't need another votin
Last call for feedback on this topic.
It seems everyone agrees to finally complete FLIP-32. Since FLIP-32 has
been accepted for a very long time, I think we don't need another voting
thread for executing the last implementation step. Please let me know if
you think differently.
I will start
flink版本:1.12
hive版本:2.3.4
flink 1.12分支写入hive decimal类型报错:
java.lang.NoSuchMethodError:
org.apache.hadoop.hive.serde2.io.HiveDecimalWritable.serialize64(I)J
at
org.apache.orc.impl.ColumnStatisticsImpl$Decimal64StatisticsImpl.updateDecimal(ColumnStatisticsImpl.java:1010)
at
org.apache.orc.
StatementSet statementSet = tableEnvironment.createStatementSet();
String sql1 = "insert into test select a,b,c from test_a_12342 /*+
OPTIONS('table-name'='test_a_1')*/";
String sql2 = "insert into test select a,b,c from test_a_12342 /*+
OPTIONS('table-name'='test_a_2')*/";
statementSet.addInsert
如题
官方的恢复是flink run -s path xxx.jar
那么flink sql没有jar包如何恢复呢
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv,
bsSettings);
Configuration configuration = bsTableEnv.getConfig().getConfiguration();
configuration.setString("execution.savepoint.path","xxx")貌似不管用
--
Sent from: h
如题
官方的恢复是flink run -s path xxx.jar
那么flink sql没有jar包如何恢复呢
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv,
bsSettings);
Configuration configuration = bsTableEnv.getConfig().getConfiguration();
configuration.setString("execution.savepoint.path","xxx")貌似不管用
--
Sent from: h
如题
官方的恢复是flink run -s path xxx.jar
那么flink sql没有jar包如何恢复呢
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv,
bsSettings);
Configuration configuration = bsTableEnv.getConfig().getConfiguration();
configuration.setString("execution.savepoint.path","xxx")貌似不管用
--
Sent from: h
退订
tiantingting5...@163.com
使用-yD yarn.containers.vcores=4
区设置flink任务的总的cpu核数,看到任务配置参数也生效了 但是实际申请核数还是按照 cpu slot一比一申请的
各位大佬使用yarn.containers.vcores是不是还需要开启yarn的cpu 调度
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
你在运行flink run命令以前export一下FLINK_LOG_DIR应该就可以的
Best,
Yang
小旋锋 于2021年3月3日周三 下午12:12写道:
> Hi all.
> 通过flink run提交Flink作业,flink client产生的日志文件默认是在 $FLINK_HOME/log 下。
> 需要将每个作业提交产生的日志分别放到不同的目录下,那么请问如何动态指定每次flink run的日志文件的路径呢?
>
>
> 附:
> 1. 通过设置 env.log.dir 配置项的值,在 flink-conf.yaml 文件中会生效,但通过 -yD 或 -D
>
36 matches
Mail list logo