flink oss ha

2021-08-26 文章 dker eandei
您好:

看文档OSS可以用作 FsStatebackend,那么Flink on k8s 
做高可用时,high-availability.storageDir可以配置成oss吗,我试了下,报以下错误:

[cid:image002.png@01D79B53.F4C71E80]

从 Windows 版邮件发送



Re: flink sink oss problem

2021-08-26 文章 payne_z
你可以看一下我上面贴的那个PR,那个PR实现的功能就是datastream sink to oss。至于写法就是和Datastream sink to 
hdfs类似的。
return StreamingFileSink
 .forRowFormat(new Path(ossPath), new SimpleStringEncoder("UTF-8"))
 .build();




在 2021年8月27日 14:34,Caizhi Weng 写道:


Hi! 从报错上来看好像是类冲突的问题。不过 oss sink 目前应该也不支持流式写入。 具体是怎么把 datastream 写入 oss 
呢?方便的话可以把代码贴一下,帮助大家更好观察问题。 payne_z  于2021年8月27日周五 
下午2:02写道: > 当我使用 https://github.com/apache/flink/pull/7798, 这个PR打pacth到flink > 
1.11.2,发现写flink通过Datastream写OSS功能并不可用。 > 报错信息如下: > java.lang.RuntimeException: 
java.lang.RuntimeException: class > 
org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not > 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.GroupMappingServiceProvider
 > at > 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2624)
 > at > 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.(Groups.java:106)
 > at > 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.(Groups.java:101)
 > at > 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:448)
 > at > 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:331)
 > at > 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:298)
 > at > 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.doSubjectLogin(UserGroupInformation.java:1780)
 > at > 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.createLoginUser(UserGroupInformation.java:704)
 > at > 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:654)
 > at > 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:565)
 > at > 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem.initialize(AliyunOSSFileSystem.java:336)
 > at > 
org.apache.flink.fs.osshadoop.OSSFileSystemFactory.create(OSSFileSystemFactory.java:105)
 > at > 
org.apache.flink.core.fs.PluginFileSystemFactory.create(PluginFileSystemFactory.java:61)
 > at > 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:468) 
> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389) > at > 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:260)
 > at > 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:396)
 > at > 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
 > at > 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
 > at > 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
 > at > 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106)
 > at > 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
 > at > 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
 > at > 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
 > at > 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
 > at > 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
 > at > 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) 
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > at 
java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.RuntimeException: 
class > org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not > 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.GroupMappingServiceProvider
 > at > 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2618)
 > ... 29 more

Re: flink sink oss problem

2021-08-26 文章 Caizhi Weng
Hi!

从报错上来看好像是类冲突的问题。不过 oss sink 目前应该也不支持流式写入。

具体是怎么把 datastream 写入 oss 呢?方便的话可以把代码贴一下,帮助大家更好观察问题。

payne_z  于2021年8月27日周五 下午2:02写道:

> 当我使用 https://github.com/apache/flink/pull/7798, 这个PR打pacth到flink
> 1.11.2,发现写flink通过Datastream写OSS功能并不可用。
> 报错信息如下:
> java.lang.RuntimeException: java.lang.RuntimeException: class
> org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.GroupMappingServiceProvider
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2624)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.(Groups.java:106)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.(Groups.java:101)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:448)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:331)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:298)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.doSubjectLogin(UserGroupInformation.java:1780)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.createLoginUser(UserGroupInformation.java:704)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:654)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:565)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem.initialize(AliyunOSSFileSystem.java:336)
> at
> org.apache.flink.fs.osshadoop.OSSFileSystemFactory.create(OSSFileSystemFactory.java:105)
> at
> org.apache.flink.core.fs.PluginFileSystemFactory.create(PluginFileSystemFactory.java:61)
> at
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:468)
> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:260)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:396)
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
> at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: class
> org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.GroupMappingServiceProvider
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2618)
> ... 29 more


Re: 如何 Flink 通过不同的hive_catalog,访问离线和实时两套集群的hive?

2021-08-26 文章 Caizhi Weng
Hi!

看起来是 Flink 集群不能访问到 wx12-dsjrt-master001/xx.xx.xx.xx:8050,检查一下网络以及这个端口的状态看看?

Jim Chen  于2021年8月27日周五 下午1:59写道:

> Hi, All
>
> 我是使用的flink1.13.1版本,我们有2套Hadoop集群,离线和实时集群。现在实时集群上的任务,想通过hive_catalog访问离线集群。
>   按照官网例子,我分别配置离线和实时的hive-conf-dir的不同路径,发现一只报错。如:
> 2021-08-27 13:50:22,902 INFO  org.apache.hadoop.ipc.Client
> [] - Retrying connect to server:
> wx12-dsjrt-master001/xx.xx.xx.xx:8050. Already tried 6 time(s); retry
> policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=50, sleepTime=1000
> MILLISECONDS)。
>   大家在生产中是如何解决这种问题的呢?非常感谢!
>


flink sink oss problem

2021-08-26 文章 payne_z
当我使用 https://github.com/apache/flink/pull/7798, 这个PR打pacth到flink 
1.11.2,发现写flink通过Datastream写OSS功能并不可用。
报错信息如下:
java.lang.RuntimeException: java.lang.RuntimeException: class 
org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.GroupMappingServiceProvider
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2624)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.(Groups.java:106)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.(Groups.java:101)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:448)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:331)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:298)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.doSubjectLogin(UserGroupInformation.java:1780)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.createLoginUser(UserGroupInformation.java:704)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:654)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:565)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem.initialize(AliyunOSSFileSystem.java:336)
at 
org.apache.flink.fs.osshadoop.OSSFileSystemFactory.create(OSSFileSystemFactory.java:105)
at 
org.apache.flink.core.fs.PluginFileSystemFactory.create(PluginFileSystemFactory.java:61)
at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:468)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:260)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:396)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: class 
org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.GroupMappingServiceProvider
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2618)
... 29 more

如何 Flink 通过不同的hive_catalog,访问离线和实时两套集群的hive?

2021-08-26 文章 Jim Chen
Hi, All
  我是使用的flink1.13.1版本,我们有2套Hadoop集群,离线和实时集群。现在实时集群上的任务,想通过hive_catalog访问离线集群。
  按照官网例子,我分别配置离线和实时的hive-conf-dir的不同路径,发现一只报错。如:
2021-08-27 13:50:22,902 INFO  org.apache.hadoop.ipc.Client
[] - Retrying connect to server:
wx12-dsjrt-master001/xx.xx.xx.xx:8050. Already tried 6 time(s); retry
policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=50, sleepTime=1000
MILLISECONDS)。
  大家在生产中是如何解决这种问题的呢?非常感谢!


Re:Re: table.exec.state.ttl

2021-08-26 文章 李航飞
你好:

我现在想在 execution environment 里面设置微批和stateValue的过期时间该怎么设?

这样 conf.setString("exec.state.ttl","15 s");
或者这样  conf.setString("stream.exec.state.ttl","15 s");











在 2021-08-26 19:05:07,"Caizhi Weng"  写道:
>Hi!
>
>table 层的配置是加在 table environment 的 table config 里的,加在 execution environment
>里无效。
>
>李航飞  于2021年8月26日周四 下午7:02写道:
>
>> Configuration conf = new Configuration();
>> conf.setString("table.exec.mini-batch.enabled","true");
>> conf.setString("table.exec.mini-batch.allow-latency","15s");
>> conf.setString("table.exec.mini-batch.size","50");
>> conf.setString("table.exec.state.ttl","15 s");StreamExecutionEnvironment
>> execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
>> execEnv.configure(conf,this.getClass().getClassLoader());
>> EnvironmentSetting setting = ...
>> StreamTableEnvironment.create(execEnv,setting); 基于flink1.13.2
>> 微批配置信息如上,kafka流数据,我感觉我这个设置根本没有产生效果
>> 上次那个 allow 也就算了,这次这个 table.exec.state.ttl
>> 设置了15秒,我等了1分钟,再次输入数据,计算结果还是累加了,没有重0开始计算。
>> 程序是通过StatementSet .execute()执行的


Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中

2021-08-26 文章 jie han
HI:
可以尝试下使用flink cdc 的方式写入到第二个kafka里面呀

悟空  于2021年8月26日周四 下午1:54写道:

> 我目前用的是flink-connector-kafka_2.11 和 flink-connector-jdbc_2.11,
> 测试时,我把任务启动好之后,把mysql 中的目标表删除 或 删除必要字段,
> 之后发送一条kafka数据,会报java.sql.BatchUpdateException 异常,然后重试3次。 
> 但是接着sink Kafka 是成功的,Kafka端 我开启了 'sink.semantic' = 'exactly-once',
> 同时下游consumer 使用 --isolation-level read_committed 读取,依旧能成功读取到数据,说明sink
> db 失败,但是sink kafka成功,同时flink 本身任务不会挂掉。
>
>
>
>
> -- 原始邮件 --
> 发件人:
>   "user-zh"
> <
> tsreape...@gmail.com>;
> 发送时间: 2021年8月26日(星期四) 中午1:25
> 收件人: "user-zh"
> 主题: Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
>
>
>
> Hi!
>
> 如果 statement set 已经包含了多个 insert 语句,那么写入 kafka 表和写入 db 应该是在同一个作业里进行的。如果写入 db
> 失败,那么产生的 exception 应该会让作业失败才对。这里 db 写入失败但 kafka 依旧写入是什么样的现象?
>
> 另外其实可以考虑分成两个作业,第一个作业将数据写入 db,第二个作业从 db 读出数据写入 kafka。关于捕获 db 数据的变化,可以看一下
> Flink CDC connector[1]
>
> [1] https://github.com/ververica/flink-cdc-connectors
>
> 悟空 
> > 能否详细说下呢,statement set[1] 是什么意思, 我是通过StatementSet.addInsertSql 将多个sql
> > 加入的,然后执行execute()方法
> >
> >
> >
> >
> > -- 原始邮件 --
> > 发件人:
> >  
> "user-zh"
> >
> <
> > fskm...@gmail.com>;
> > 发送时间: 2021年8月26日(星期四) 中午12:36
> > 收件人: "user-zh" >
> > 主题: Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
> >
> >
> >
> > 说的是 statement set [1] 吗 ?
> >
> > [1]
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sqlclient/#execute-a-set-of-sql-statements
> >
> >
> ;
> > 悟空  >
> > > hi all:&nbsp;
> > > &nbsp; &nbsp; 我目前基于flink 1.12 sql 来开发功能,
> 目前遇到一个问题, 我现在想实现
> > 在一个事务里 先将kafka
> > > 源的数据写入到一张msyql 表中, 写入成功后再写入一张kafka的表里。如果写入db失败,则不写入kafka 。
> > > &nbsp; &nbsp;语句类似这种:
> > > &nbsp; &nbsp;insert into
> db_table_sink&nbsp;select *
> > from&nbsp;
> > > kafka_source_table;
> > > &nbsp; &nbsp;insert into kafka_table_sink
> select * from
> > kafka_source_table;
> > >
> > >
> > > &nbsp; 请问flink SQL 有实现方式吗?
> 目前经过测试,发现如果db写入失败,但是kafka依旧写入,同时flink
> > 程序没有挂掉。


Re: 【Announce】Zeppelin 0.10.0 is released, Flink on Zeppelin Improved

2021-08-26 文章 Till Rohrmann
Cool, thanks for letting us know Jeff. Hopefully, many users use Zeppelin
together with Flink.

Cheers,
Till

On Thu, Aug 26, 2021 at 4:47 AM Leonard Xu  wrote:

> Thanks Jeff for the great work !
>
> Best,
> Leonard
>
> 在 2021年8月25日,22:48,Jeff Zhang  写道:
>
> Hi Flink users,
>
> We (Zeppelin community) are very excited to announce Zeppelin 0.10.0 is
> officially released. In this version, we made several improvements on Flink
> interpreter.  Here's the main features of Flink on Zeppelin:
>
>- Support multiple versions of Flink
>- Support multiple versions of Scala
>- Support multiple languages
>- Support multiple execution modes
>- Support Hive
>- Interactive development
>- Enhancement on Flink SQL
>- Multi-tenancy
>- Rest API Support
>
> Take a look at this document for more details:
> https://zeppelin.apache.org/docs/0.10.0/interpreter/flink.html
> The quickest way to try Flink on Zeppelin is via its docker image
> https://zeppelin.apache.org/docs/0.10.0/interpreter/flink.html#play-flink-in-zeppelin-docker
>
> Besides these, here’s one blog about how to run Flink sql cookbook on
> Zeppelin,
> https://medium.com/analytics-vidhya/learn-flink-sql-the-easy-way-d9d48a95ae57
> The easy way to learn Flink Sql.
>
> Hope it would be helpful for you and welcome to join our community to
> discuss with others. http://zeppelin.apache.org/community.html
>
>
> --
> Best Regards
>
> Jeff Zhang
>
>
>


Re: 退订

2021-08-26 文章 Caizhi Weng
Hi!

退订中文邮件列表请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org,其他邮件列表退订邮箱参见
https://flink.apache.org/community.html#mailing-lists

William王卫光  于2021年8月26日周四 下午5:23写道:

> Re:退订
>  
>  
> -- Original --
> From:  "陈军"<17688550...@163.com>;
> Date:  Thu, Aug 26, 2021 04:58 PM
> To:  "user-zh"
> Subject:  退订
>
>  
>
> 退订
>
>
> | |
> 陈军
> |
> |
> 邮箱:17688550...@163.com
> |
>
> 签名由 网易邮箱大师 定制


Re: flink on native k8s jobManger频繁重启,caused by too old resource version

2021-08-26 文章 Caizhi Weng
Hi!

io.fabric8.kubernetes.client.KubernetesClientException: too old resource
version 这个问题可能是由于 Pod 和 APIServer之间的网络不稳定。这个问题已经在 1.12.2 版本被修复,详见
https://issues.apache.org/jira/browse/FLINK-20417

建议使用 Flink 1.12.x 的最新版本。

blue  于2021年8月26日周四 下午5:30写道:

> Hi,
>flink 1.12.1  on native k8s
> application模式,jobManager频繁重启可能是什么原因导致的,报错日志如下:
>
>
> "2021-08-26 12:41:12,891 DEBUG 
> org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTrackerImpl
> [] - Processing cluster partition report from task executor
> tb-bipo-upload-file-info-binlog2mongo-taskmanager-50-1:
> PartitionReport{entries=[]}."
>
>
> "2021-08-26 12:41:13,082 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
> Connecting websocket ...
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@7799909e"
>
>
> "2021-08-26 12:41:13,085 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
> WebSocket successfully opened"
>
>
> "2021-08-26 12:41:13,093 ERROR
> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl  [] - Leader
> Election Service encountered a fatal error."
>
>
> "org.apache.flink.runtime.leaderelection.LeaderElectionException: Error
> while watching the ConfigMap
> tb-bipo-upload-file-info-binlog2mongo-ef82295e3efde28d7b424565606753e0-jobmanager-leader"
>
>
> "at
> org.apache.flink.kubernetes.highavailability.KubernetesLeaderElectionDriver$ConfigMapCallbackHandlerImpl.handleFatalError(KubernetesLeaderElectionDriver.java:251)
> [flink-dist_2.12-1.12.1.jar:1.12.1]"
>
>
> "at
> org.apache.flink.kubernetes.kubeclient.resources.AbstractKubernetesWatcher.onClose(AbstractKubernetesWatcher.java:48)
> [flink-dist_2.12-1.12.1.jar:1.12.1]"
>
>
> "at
> io.fabric8.kubernetes.client.utils.WatcherToggle.onClose(WatcherToggle.java:56)
> [flink-dist_2.12-1.12.1.jar:1.12.1]"
>
>
> "at
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager.closeEvent(WatchConnectionManager.java:367)
> [flink-dist_2.12-1.12.1.jar:1.12.1]"
>
>
> "at
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager.access$700(WatchConnectionManager.java:50)
> [flink-dist_2.12-1.12.1.jar:1.12.1]"
>
>
> "at
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259)
> [flink-dist_2.12-1.12.1.jar:1.12.1]"
>
>
> "at 
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.onReadMessage(RealWebSocket.java:323)
> [flink-dist_2.12-1.12.1.jar:1.12.1]"
>
>
> "at 
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.readMessageFrame(WebSocketReader.java:219)
> [flink-dist_2.12-1.12.1.jar:1.12.1]"
>
>
> "at 
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:105)
> [flink-dist_2.12-1.12.1.jar:1.12.1]"
>
>
> "at 
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274)
> [flink-dist_2.12-1.12.1.jar:1.12.1]"
>
>
> "at 
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214)
> [flink-dist_2.12-1.12.1.jar:1.12.1]"
>
>
> "at
> org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:206)
> [flink-dist_2.12-1.12.1.jar:1.12.1]"
>
>
> "at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
> [flink-dist_2.12-1.12.1.jar:1.12.1]"
>
>
> "at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_292]"
>
>
> "at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_292]"
>
>
> "at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]"
>
>
> "Caused by: io.fabric8.kubernetes.client.KubernetesClientException: too
> old resource version: 2137353712 (2137353732)"
>
>
> "... 11 more"
>
>
> "2021-08-26 12:41:13,093 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] - Force
> closing the watch
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@7799909e"
>
>
> "2021-08-26 12:41:13,093 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
> Ignoring duplicate firing of onClose event"
>
>
> "2021-08-26 12:41:13,094 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
> WebSocket close received. code: 1000, reason: "
>
>
> "2021-08-26 12:41:13,094 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
> Ignoring onClose for already closed/closing websocket"
>
>
> "2021-08-26 12:41:13,095 ERROR
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Fatal
> error occurred in the cluster entrypoint."
>
>
> "org.apache.flink.util.FlinkException: JobMaster for job
> ef82295e3efde28d7b424565606753e0 failed."


Re: table.exec.state.ttl

2021-08-26 文章 Caizhi Weng
Hi!

table 层的配置是加在 table environment 的 table config 里的,加在 execution environment
里无效。

李航飞  于2021年8月26日周四 下午7:02写道:

> Configuration conf = new Configuration();
> conf.setString("table.exec.mini-batch.enabled","true");
> conf.setString("table.exec.mini-batch.allow-latency","15s");
> conf.setString("table.exec.mini-batch.size","50");
> conf.setString("table.exec.state.ttl","15 s");StreamExecutionEnvironment
> execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
> execEnv.configure(conf,this.getClass().getClassLoader());
> EnvironmentSetting setting = ...
> StreamTableEnvironment.create(execEnv,setting); 基于flink1.13.2
> 微批配置信息如上,kafka流数据,我感觉我这个设置根本没有产生效果
> 上次那个 allow 也就算了,这次这个 table.exec.state.ttl
> 设置了15秒,我等了1分钟,再次输入数据,计算结果还是累加了,没有重0开始计算。
> 程序是通过StatementSet .execute()执行的


table.exec.state.ttl

2021-08-26 文章 李航飞
Configuration conf = new Configuration();
conf.setString("table.exec.mini-batch.enabled","true");
conf.setString("table.exec.mini-batch.allow-latency","15s");
conf.setString("table.exec.mini-batch.size","50");
conf.setString("table.exec.state.ttl","15 s");StreamExecutionEnvironment 
execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); 
execEnv.configure(conf,this.getClass().getClassLoader()); EnvironmentSetting 
setting = ... StreamTableEnvironment.create(execEnv,setting); 基于flink1.13.2
微批配置信息如上,kafka流数据,我感觉我这个设置根本没有产生效果
上次那个 allow 也就算了,这次这个 table.exec.state.ttl 
设置了15秒,我等了1分钟,再次输入数据,计算结果还是累加了,没有重0开始计算。
程序是通过StatementSet .execute()执行的

Re: Flink的window计算、级联window计算、window后复杂算子计算等导致CPU尖刺问题

2021-08-26 文章 yidan zhao
刚刚看源码发现在1.12.0的时候tumblineEventTimeWindows中貌似引入了一个WindowStagger的东西。
大概看了下,和我的方案3思想类似。但这个从代码文档来看,貌似仅支持算子的分区级别,也就是单个并行实例仅仅
支持一个offset值。
我的方案3时基于key的hash的某种机制,使得某个key固定分散个某个offset值上。可以认为是将key哈希到[0,
windowSize)范围的一个值作为offset。

yidan zhao  于2021年8月26日周四 下午5:33写道:

> 附语雀:https://www.yuque.com/sixhours-gid0m/ls9vqu/nhgrpw
>
> 如果有人做过类似的,有什么好的方案可以分享下。也可以看看我当前的几个方案是否符合我的预想。
>


Flink的window计算、级联window计算、window后复杂算子计算等导致CPU尖刺问题

2021-08-26 文章 yidan zhao
附语雀:https://www.yuque.com/sixhours-gid0m/ls9vqu/nhgrpw

如果有人做过类似的,有什么好的方案可以分享下。也可以看看我当前的几个方案是否符合我的预想。


flink on native k8s jobManger频繁重启,caused by too old resource version

2021-08-26 文章 blue
Hi,
   flink 1.12.1  on native k8s application模式,jobManager频繁重启可能是什么原因导致的,报错日志如下:


"2021-08-26 12:41:12,891 DEBUG 
org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTrackerImpl
 [] - Processing cluster partition report from task executor 
tb-bipo-upload-file-info-binlog2mongo-taskmanager-50-1: 
PartitionReport{entries=[]}."


"2021-08-26 12:41:13,082 DEBUG 
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] - 
Connecting websocket ... 
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@7799909e"


"2021-08-26 12:41:13,085 DEBUG 
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] - WebSocket 
successfully opened"


"2021-08-26 12:41:13,093 ERROR 
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl  [] - Leader 
Election Service encountered a fatal error."


"org.apache.flink.runtime.leaderelection.LeaderElectionException: Error while 
watching the ConfigMap 
tb-bipo-upload-file-info-binlog2mongo-ef82295e3efde28d7b424565606753e0-jobmanager-leader"


"at 
org.apache.flink.kubernetes.highavailability.KubernetesLeaderElectionDriver$ConfigMapCallbackHandlerImpl.handleFatalError(KubernetesLeaderElectionDriver.java:251)
 [flink-dist_2.12-1.12.1.jar:1.12.1]"


"at 
org.apache.flink.kubernetes.kubeclient.resources.AbstractKubernetesWatcher.onClose(AbstractKubernetesWatcher.java:48)
 [flink-dist_2.12-1.12.1.jar:1.12.1]"


"at 
io.fabric8.kubernetes.client.utils.WatcherToggle.onClose(WatcherToggle.java:56) 
[flink-dist_2.12-1.12.1.jar:1.12.1]"


"at 
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager.closeEvent(WatchConnectionManager.java:367)
 [flink-dist_2.12-1.12.1.jar:1.12.1]"


"at 
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager.access$700(WatchConnectionManager.java:50)
 [flink-dist_2.12-1.12.1.jar:1.12.1]"


"at 
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259)
 [flink-dist_2.12-1.12.1.jar:1.12.1]"


"at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.onReadMessage(RealWebSocket.java:323)
 [flink-dist_2.12-1.12.1.jar:1.12.1]"


"at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.readMessageFrame(WebSocketReader.java:219)
 [flink-dist_2.12-1.12.1.jar:1.12.1]"


"at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:105)
 [flink-dist_2.12-1.12.1.jar:1.12.1]"


"at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274)
 [flink-dist_2.12-1.12.1.jar:1.12.1]"


"at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214)
 [flink-dist_2.12-1.12.1.jar:1.12.1]"


"at 
org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:206)
 [flink-dist_2.12-1.12.1.jar:1.12.1]"


"at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
 [flink-dist_2.12-1.12.1.jar:1.12.1]"


"at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_292]"


"at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_292]"


"at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]"


"Caused by: io.fabric8.kubernetes.client.KubernetesClientException: too old 
resource version: 2137353712 (2137353732)"


"... 11 more"


"2021-08-26 12:41:13,093 DEBUG 
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] - Force 
closing the watch 
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@7799909e"


"2021-08-26 12:41:13,093 DEBUG 
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] - Ignoring 
duplicate firing of onClose event"


"2021-08-26 12:41:13,094 DEBUG 
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] - WebSocket 
close received. code: 1000, reason: "


"2021-08-26 12:41:13,094 DEBUG 
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] - Ignoring 
onClose for already closed/closing websocket"


"2021-08-26 12:41:13,095 ERROR 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Fatal error 
occurred in the cluster entrypoint."


"org.apache.flink.util.FlinkException: JobMaster for job 
ef82295e3efde28d7b424565606753e0 failed."

Re:退订

2021-08-26 文章 William王卫光
Re:退订
 
 
-- Original --
From:  "陈军"<17688550...@163.com>;
Date:  Thu, Aug 26, 2021 04:58 PM
To:  "user-zh"

退订

2021-08-26 文章 陈军
退订


| |
陈军
|
|
邮箱:17688550...@163.com
|

签名由 网易邮箱大师 定制