退订

2021-08-29 文章 罗海芳




| |
罗海芳
|
|
15678617...@163.com
|
签名由网易邮箱大师定制

回复: flink oss ha

2021-08-29 文章 dker eandei
您好:
 附件是使用oss作高可用时的报错,以下是启动flink时的脚本:

../bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=flink-session-1 \
-Dkubernetes.container.image=test/flink:1.13.2-scala_2.12-oss \
-Dkubernetes.container.image.pull-policy=Always \
-Dkubernetes.namespace=flink-session \
-Dkubernetes.service-account=flink-session-sa \
-Dkubernetes.rest-service.exposed.type=ClusterIP \
-Dtaskmanager.numberOfTaskSlots=6 \
-Djobmanager.memory.process.size=1024m \
-Dtaskmanager.memory.process.size=2048m \
-Dkubernetes.jobmanager.cpu=1 \
-Dkubernetes.taskmanager.cpu=2 \
-Dfs.oss.endpoint="http://oss-.local"; \
-Dfs.oss.accessKeyId="j0BAJ" \
-Dfs.oss.accessKeySecret="7mzTPiC4w" \

-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
 \

-Dhigh-availability.storageDir=oss://bucket-logcenter/flink-state/flink-session-recovery
 \

-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.13.2.jar
 \

-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.13.2.jar

-邮件原件-
发件人: Yun Tang  
发送时间: 2021年8月30日 11:36
收件人: user-zh@flink.apache.org
主题: Re: flink oss ha

Hi,
你好,图片无法加载,可以直接粘贴文字出来

祝好
唐云

From: dker eandei 
Sent: Friday, August 27, 2021 14:58
To: user-zh@flink.apache.org 
Subject: flink oss ha


您好:

看文档OSS可以用作 FsStatebackend,那么Flink on k8s 
做高可用时,high-availability.storageDir可以配置成oss吗,我试了下,报以下错误:

[cid:image002.png@01D79B53.F4C71E80]



从 Windows 
版邮件发送


2021-08-30 12:21:19,298 INFO  akka.remote.Remoting  
   [] - Remoting started; listening on addresses 
:[akka.tcp://flink@ip:6123]
2021-08-30 12:21:19,547 INFO  
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils[] - Actor system 
started at akka.tcp://flink@ip:6123
2021-08-30 12:21:21,816 INFO  org.apache.flink.runtime.blob.FileSystemBlobStore 
   [] - Creating highly available BLOB storage directory at 
oss://bucket-logcenter/flink-state/flink-session-recovery/flink-session-1/blob
2021-08-30 12:21:22,136 INFO  
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss  [] - 
[Server]Unable to execute HTTP request: Not Found
[ErrorCode]: NoSuchKey
[RequestId]: 612C5CC21078CF8B58AB7521
[HostId]: null
2021-08-30 12:21:22,145 INFO  
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss  [] - 
[Server]Unable to execute HTTP request: Not Found
[ErrorCode]: NoSuchKey
[RequestId]: 612C5CC27BF4BC5C747B6452
[HostId]: null
2021-08-30 12:21:22,467 WARN  
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss  [] - 
[Server]Unable to execute HTTP request: There are invalid characters in 
parameters.
[ErrorCode]: InvalidArgument
[RequestId]: 612C5CC2EF7A8F7D9E7B4301
[HostId]: oss-.local
[ResponseError]:


  InvalidArgument
  There are invalid characters in parameters.
  612C5CC2EF7A8F7D9E7B4301
  oss-.local
  prefix
  
flink-state/flink-session-recovery/flink-session-1/blob/


2021-08-30 12:21:22,471 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting 
KubernetesSessionClusterEntrypoint down with application status FAILED. 
Diagnostics org.apache.flink.util.FlinkException: Could not create the ha 
services from the instantiated HighAvailabilityServicesFactory 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.
at 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:268)
at 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:124)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:353)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:311)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:239)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:189)
at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:186)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:600)
at 
org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint.main(KubernetesSe

Re: 如何 Flink 通过不同的hive_catalog,访问离线和实时两套集群的hive?

2021-08-29 文章 Rui Li
你好,

这个看起来是Hadoop的报错,连接失败的这个节点是NameNode么?创建HiveCatalog时可以指定Hadoop的配置文件的,要保证两个HiveCatalog读到的Hadoop配置是正确的。

另外使用多个HiveCatalog需要保证hive版本一致,不同版本的hive没办法一起用

On Fri, Aug 27, 2021 at 3:01 PM Jim Chen 
wrote:

> Hi
>
> 集群上根本没有这个端口,也不知道这个端口是干嘛用的,为啥要连这个。这个是实时集群上的端口
>
> Caizhi Weng  于2021年8月27日周五 下午2:33写道:
>
> > Hi!
> >
> > 看起来是 Flink 集群不能访问到
> wx12-dsjrt-master001/xx.xx.xx.xx:8050,检查一下网络以及这个端口的状态看看?
> >
> > Jim Chen  于2021年8月27日周五 下午1:59写道:
> >
> > > Hi, All
> > >
> > >
> >
> 我是使用的flink1.13.1版本,我们有2套Hadoop集群,离线和实时集群。现在实时集群上的任务,想通过hive_catalog访问离线集群。
> > >   按照官网例子,我分别配置离线和实时的hive-conf-dir的不同路径,发现一只报错。如:
> > > 2021-08-27 13:50:22,902 INFO  org.apache.hadoop.ipc.Client
> > > [] - Retrying connect to server:
> > > wx12-dsjrt-master001/xx.xx.xx.xx:8050. Already tried 6 time(s); retry
> > > policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=50,
> > sleepTime=1000
> > > MILLISECONDS)。
> > >   大家在生产中是如何解决这种问题的呢?非常感谢!
> > >
> >
>


-- 
Best regards!
Rui Li


Re: flink oss ha

2021-08-29 文章 Yun Tang
Hi,
你好,图片无法加载,可以直接粘贴文字出来

祝好
唐云

From: dker eandei 
Sent: Friday, August 27, 2021 14:58
To: user-zh@flink.apache.org 
Subject: flink oss ha


您好:

看文档OSS可以用作 FsStatebackend,那么Flink on k8s 
做高可用时,high-availability.storageDir可以配置成oss吗,我试了下,报以下错误:

[cid:image002.png@01D79B53.F4C71E80]



从 Windows 版邮件发送




Re: table.exec.state.ttl

2021-08-29 文章 Yun Tang
Hi 航飞

可以参照[1] 看是不是类似的问题


[1] https://issues.apache.org/jira/browse/FLINK-23721

祝好
唐云

From: 李航飞 
Sent: Thursday, August 26, 2021 19:02
To: user-zh 
Subject: table.exec.state.ttl

Configuration conf = new Configuration();
conf.setString("table.exec.mini-batch.enabled","true");
conf.setString("table.exec.mini-batch.allow-latency","15s");
conf.setString("table.exec.mini-batch.size","50");
conf.setString("table.exec.state.ttl","15 s");StreamExecutionEnvironment 
execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); 
execEnv.configure(conf,this.getClass().getClassLoader()); EnvironmentSetting 
setting = ... StreamTableEnvironment.create(execEnv,setting); 基于flink1.13.2
微批配置信息如上,kafka流数据,我感觉我这个设置根本没有产生效果
上次那个 allow 也就算了,这次这个 table.exec.state.ttl 
设置了15秒,我等了1分钟,再次输入数据,计算结果还是累加了,没有重0开始计算。
程序是通过StatementSet .execute()执行的


Re: flink系统配置中的io.tmp.dirs里的文件自动清理机制

2021-08-29 文章 Caizhi Weng
Hi!

这些临时文件基本都是网络 shuffle 的数据,Flink 会自动清理不用的数据,理论上不会无限增大。

另外你使用的 state backend 是什么?部分 state backend 比如 rocksdb 的临时文件也会存在这里,此时就和 state
的大小相关。

王春波 <893885...@qq.com.invalid> 于2021年8月30日周一 上午10:26写道:

>
> 请问作业结束是指任务停止么,还是数据处理完成一部分,实时分析job一直在消费kafka数据,临时文件内容会很大,flink也没有什么策略处理一下么?磁盘会被占满的,怎么办呀?
> 还有个问题,临时文件里存储的是数据么?
>
>
> -- 原始邮件 --
> 发件人:
>   "user-zh"
> <
> tsreape...@gmail.com>;
> 发送时间: 2021年8月30日(星期一) 上午10:03
> 收件人: "user-zh"
> 主题: Re: flink系统配置中的io.tmp.dirs里的文件自动清理机制
>
>
>
> Hi!
>
> io.tmp.dirs 里的文件在作业结束后会自动清理,作业运行过程中不应该清理。
>
> 王春波 <893885...@qq.com.invalid> 于2021年8月30日周一 上午9:57写道:
>
> >
> >
> 在运行flink任务时,在flink-conf.yaml配置文件中的io.tmp.dirs里会缓存些临时文件,这些文件有没有清理的机制?在实际运行中该目录里的文件占用巨大的磁盘空间,约40G左右,将其删除后,任务就执行失败了,不知道该如何清理,多指教


?????? flink????????????io.tmp.dirs????????????????????

2021-08-29 文章 ??????
job??kafka??flink
??


--  --
??: 
   "user-zh"



Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中

2021-08-29 文章 Shuo Cheng
你好, 你说的这种控制写入的方式在同一个 Flink SQL job 里是无法实现的. 控制数据是否写入某个
Sink,可以看看是否从逻辑上能在 Sink 前加一个 Filter,从而达到过滤目的;如果 kafka sink 跟 MySQL
表是一种类似级联的关系, 可以考虑先写入 MySQL, 然后另起一个 Job 用 CDC 方式读 MySQL changelog 再写入
Kafka sink.

On 8/26/21, jie han  wrote:
> HI:
> 可以尝试下使用flink cdc 的方式写入到第二个kafka里面呀
>
> 悟空  于2021年8月26日周四 下午1:54写道:
>
>> 我目前用的是flink-connector-kafka_2.11 和 flink-connector-jdbc_2.11,
>> 测试时,我把任务启动好之后,把mysql 中的目标表删除 或 删除必要字段,
>> 之后发送一条kafka数据,会报java.sql.BatchUpdateException 异常,然后重试3次。 
>> 但是接着sink Kafka 是成功的,Kafka端 我开启了 'sink.semantic' = 'exactly-once',
>> 同时下游consumer 使用 --isolation-level read_committed
>> 读取,依旧能成功读取到数据,说明sink
>> db 失败,但是sink kafka成功,同时flink 本身任务不会挂掉。
>>
>>
>>
>>
>> -- 原始邮件 --
>> 发件人:
>>   "user-zh"
>> <
>> tsreape...@gmail.com>;
>> 发送时间: 2021年8月26日(星期四) 中午1:25
>> 收件人: "user-zh">
>> 主题: Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
>>
>>
>>
>> Hi!
>>
>> 如果 statement set 已经包含了多个 insert 语句,那么写入 kafka 表和写入 db 应该是在同一个作业里进行的。如果写入
>> db
>> 失败,那么产生的 exception 应该会让作业失败才对。这里 db 写入失败但 kafka 依旧写入是什么样的现象?
>>
>> 另外其实可以考虑分成两个作业,第一个作业将数据写入 db,第二个作业从 db 读出数据写入 kafka。关于捕获 db 数据的变化,可以看一下
>> Flink CDC connector[1]
>>
>> [1] https://github.com/ververica/flink-cdc-connectors
>>
>> 悟空 >
>> > 能否详细说下呢,statement set[1] 是什么意思, 我是通过StatementSet.addInsertSql 将多个sql
>> > 加入的,然后执行execute()方法
>> >
>> >
>> >
>> >
>> > -- 原始邮件 --
>> > 发件人:
>> >  
>> "user-zh"
>> >
>> <
>> > fskm...@gmail.com>;
>> > 发送时间: 2021年8月26日(星期四) 中午12:36
>> > 收件人: "user-zh"> >
>> > 主题: Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
>> >
>> >
>> >
>> > 说的是 statement set [1] 吗 ?
>> >
>> > [1]
>> >
>> >
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sqlclient/#execute-a-set-of-sql-statements
>> >
>> >
>> ;
>> > 悟空 > >
>> > > hi all:&nbsp;
>> > > &nbsp; &nbsp; 我目前基于flink 1.12 sql 来开发功能,
>> 目前遇到一个问题, 我现在想实现
>> > 在一个事务里 先将kafka
>> > > 源的数据写入到一张msyql 表中, 写入成功后再写入一张kafka的表里。如果写入db失败,则不写入kafka 。
>> > > &nbsp; &nbsp;语句类似这种:
>> > > &nbsp; &nbsp;insert into
>> db_table_sink&nbsp;select *
>> > from&nbsp;
>> > > kafka_source_table;
>> > > &nbsp; &nbsp;insert into kafka_table_sink
>> select * from
>> > kafka_source_table;
>> > >
>> > >
>> > > &nbsp; 请问flink SQL 有实现方式吗?
>> 目前经过测试,发现如果db写入失败,但是kafka依旧写入,同时flink
>> > 程序没有挂掉。
>


Re: flink系统配置中的io.tmp.dirs里的文件自动清理机制

2021-08-29 文章 Caizhi Weng
Hi!

io.tmp.dirs 里的文件在作业结束后会自动清理,作业运行过程中不应该清理。

王春波 <893885...@qq.com.invalid> 于2021年8月30日周一 上午9:57写道:

>
> 在运行flink任务时,在flink-conf.yaml配置文件中的io.tmp.dirs里会缓存些临时文件,这些文件有没有清理的机制?在实际运行中该目录里的文件占用巨大的磁盘空间,约40G左右,将其删除后,任务就执行失败了,不知道该如何清理,多指教


Re: flink run -d -m yarn-cluster 提交任务到yarn集群不成功

2021-08-29 文章 Caizhi Weng
Hi!

不太明白“配置到哪一级”是什么含义。export 命令是将变量导出到目前的 shell session,在从 shell logout
之前都有效。如果运行了 export 命令之后再运行 flink run 应该就可行了。

Wayne <1...@163.com> 于2021年8月28日周六 下午8:37写道:

> 我的提交命令
>
>
> ./bin/flink run -d -m yarn-cluster
>
>
> 报错如下
>  The program finished with the following exception:
>
>
> java.lang.IllegalStateException: No Executor found. Please make sure to
> export the HADOOP_CLASSPATH environment variable or have hadoop in your
> classpath. For more information refer to the "Deployment" section of the
> official Apache Flink documentation.
> at
> org.apache.flink.yarn.cli.FallbackYarnSessionCli.isActive(FallbackYarnSessionCli.java:41)
> at
> org.apache.flink.client.cli.CliFrontend.validateAndGetActiveCommandLine(CliFrontend.java:1236)
> at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:234)
> at
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
> at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
>
>
> 运行命令 hadoop classpath
> @192 flink-1.12.2 % hadoop classpath
>
> /Users//local/hadoop/hadoop-3.2.2/etc/hadoop:/Users//local/hadoop/hadoop-3.2.2/share/hadoop/common/lib/*:/Users//local/hadoop/hadoop-3.2.2/share/hadoop/common/*:/Users//local/hadoop/hadoop-3.2.2/share/hadoop/hdfs:/Users//local/hadoop/hadoop-3.2.2/share/hadoop/hdfs/lib/*:/Users//local/hadoop/hadoop-3.2.2/share/hadoop/hdfs/*:/Users//local/hadoop/hadoop-3.2.2/share/hadoop/mapreduce/lib/*:/Users//local/hadoop/hadoop-3.2.2/share/hadoop/mapreduce/*:/Users//local/hadoop/hadoop-3.2.2/share/hadoop/yarn:/Users//local/hadoop/hadoop-3.2.2/share/hadoop/yarn/lib/*:/Users//local/hadoop/hadoop-3.2.2/share/hadoop/yarn/*:/Users//local/hadoop/hadoop-3.2.2
> 反复配置HADOOP_CLASSPATH 无法生效 官网给出的
> export HADOOP_CLASSPATH=`hadoop classpath`
> 这个 hadoop classpath 具体配置到哪一级
>
>
>
>
>
>
>
>


flink????????????io.tmp.dirs????????????????????

2021-08-29 文章 ??????
??flink??flink-conf.yamlio.tmp.dirs??40G