Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-05-31 文章 HunterXHunter
我遇到过的问题就是开了 增量checkpoint后,checkpoint会越来越大,关闭之后checkpoint就正常了



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink SQL 1.11.3问题请教

2021-05-31 文章 yinghua...@163.com
我们使用Nifi将数据采集到kafka,然后使用Flink处理kafka中的数据,现在Nifi如果将多条数据当做一条记录(写入kafka中的数据格式为csv或者json)写入kafka后(就是一个offset中包含了多条数据),Flink只会处理其中的一条数据?有没有什么办法让Flink都处理一个offset中的多条数据?



yinghua...@163.com


Re: Flink 1.11.2 SQL消费kafka写Hive报错

2021-05-31 文章 Jacob
问题已解决

需要在FLink home的lib中引入kafka connector jar包



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-05-31 文章 yujianbo
感谢大佬的回复!以后优先现在邮箱这边讨论发现问题再去提个issue!
我的 idleStateRetention确实是设置3600秒,我先进行测试看看。




--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink sql cdc 采集mysql binlog 可以保留before,after的字段吗

2021-05-31 文章 董建
flink sql cdc 采集mysql binlog 可以保留before,after的字段吗?
按照官方的例子,定义表结构后,是最新的字段值?
能否同时保留before和after?

Re:datastream union各个topic的数据后,数据有丢失

2021-05-31 文章 13631283359



已经解决了,去掉循环,把每个kafka topic单独处理,再union













在 2021-06-01 08:54:42,"13631283359" <13631283...@163.com> 写道:

大家好,
我最近用datastream union各个topic的数据后,数据有丢失,比单独消费每个topic的数据少很多
代码如下:
/**
* 将多个流合并成一个进行处理 * * @param topics 通过配置文件传入的 Topic 和 消费者GroupID * @param env 
Flink运行环境 * @return 将多个Topic数据流整合成一个流 */ def getUnionConsumer(topics: 
List[String], env: StreamExecutionEnvironment): DataStream[String] = { var 
total: DataStream[String] = null for (str <- topics) { val topicName = 
str.split(":")(0) val groupId = str.split(":")(1) val source_data = 
getSourceData(topicName, groupId, env) if (total != null) { total = 
total.union(source_data) } else { total = source_data } } total }





 

Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-05-31 文章 Yun Tang
Hi,

先确定一下,你的 idleStateRetention 是 3600秒?其次,要想看是否所有数据均有用,可以利用 
Checkpoints.loadCheckpointMeta [1] 去加载你所保留的checkpoint目录下的 _metadata 
文件,然后与当前checkpoint目录下的文件作对比,看是否存在大量的未删除旧文件。

目前仅凭你的描述和一段SQL代码其实很难判断。
可能存在的原因有:

  1.  单次checkpoint文件数目过多,JM单点删除跟不上相关速度
  2.  整体checkpoint 
size不大,RocksDB的compaction频率较低,导致相关包含过期数据的旧文件并没有被及时删除,仍然存在checkpoint目录下(保留多个checkpoint同样会放大该问题)

所以还是最好先分析已有checkpoint的meta文件,再与目前的实际情况做对比。

另外,Flink 
JIRA其实不是一个很好的直接问问题的地方,尤其是这种并不明确的问题,更妥善的方式还是先在邮件列表讨论,具体定位到原因后,再创建相关详细的ticket。

[1] 
https://github.com/apache/flink/blob/b582991b8b2b8dadb89e71d5002c4a9cc2055e34/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L99

祝好
唐云

From: yujianbo <15205029...@163.com>
Sent: Tuesday, June 1, 2021 10:51
To: user-zh@flink.apache.org 
Subject: Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

没有更好的方式吗



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink 1.11.2 SQL消费kafka写Hive报错

2021-05-31 文章 Jacob
Thank you for your reply!

您所说的kafka connector 是*flink-connector-kafka_2.11*
这个依赖吗?这个是Datastream所用的依赖,我在pom中已经引入了* flink-sql-connector-kafka_2.11*依赖了。
我试了引入* flink-connector-kafka_2.11*,但还是会报错的。



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink 1.11.2 SQL消费kafka写Hive报错

2021-05-31 文章 Jacob
Thank you for your reply!

您所说的kafka connector 是* flink-connector-kafka_2.11*
这个依赖吗?这个是Datastream所用的依赖,我在pom中已经引入了 *flink-sql-connector-kafka_2.11*依赖了。
我试了引入* flink-connector-kafka_2.11*,但还是会报错的。



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-05-31 文章 yujianbo
没有更好的方式吗,这样治标不治本,那我大状态任务会有很多问题



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-05-31 文章 yujianbo
没有更好的方式吗



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-05-31 文章 HunterXHunter
关闭 增量checkpoint



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink 1.11.2 SQL消费kafka写Hive报错

2021-05-31 文章 LakeShen
Hi Jacob,

Maybe you miss the kafka connector dependency in your pom,
you could refer to this url :
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html

Best,
LakeShen

Jacob <17691150...@163.com> 于2021年6月1日周二 上午9:54写道:

> Dear All,
>
> 我在使用Flink SQL消费kafka并写入hive时报错,代码和报错信息如下
>
> 其中,Flink 版本为1.11.2,kafka connector使用的是flink-sql-connector-kafka。
> 搜索了很久,依然没有解决,相同的sql语句在flink sql clients可以执行,但在Java代码中,提交到hadoop集群后就报错如下:
>
> 请指教
>
> *Java Code*
>
> TableResult tableResult = tableEnvironment.executeSql("DROP TABLE IF EXISTS
> user_behavior_kafka_table");
> tableResult.print();
> TableResult tableResult2 = tableEnvironment.executeSql(
> "CREATE TABLE user_behavior_kafka_table
> (\r\n" +
> "   `user_id` STRING,\r\n" +
> "   `item_id` STRING\r\n" +
> " ) WITH (\r\n" +
> "   'connector' = 'kafka',\r\n" +
> "   'topic' = 'TestTopic',\r\n" +
> "   'properties.bootstrap.servers' =
> 'localhost:9092',\r\n" +
> "   'properties.group.id' =
> 'consumerTest',\r\n" +
> "   'scan.startup.mode' =
> 'earliest-offset',\r\n" +
> "   'format' = 'json'\r\n" +
> ")");
> tableResult2.print();
>
>
> // 数据写入
> tableEnvironment.getConfig().setSqlDialect(SqlDialect.HIVE);
> tableEnvironment.executeSql(
> "INSERT INTO user_behavior_hive_table SELECT user_id,
> item_id FROM user_behavior_kafka_table");
>
>
> *POM File*
>
> 
> org.apache.flink
> flink-json
> ${flink.version}
> 
>
> 
> org.apache.flink
> flink-streaming-java_2.11
> ${flink.version}
> provided
> 
>
> 
> org.apache.flink
>
> flink-clients_${scala.binary.version}
> ${flink.version}
> provided
> 
>
> 
> org.apache.flink
>
>
> flink-table-api-java-bridge_${scala.binary.version}
> ${flink.version}
> provided
> 
>
> 
> org.apache.flink
>
> flink-table-planner-blink_${scala.binary.version}
> ${flink.version}
> provided
> 
>
> 
> org.apache.flink
>
> flink-sql-connector-kafka_${scala.binary.version}
> ${flink.version}
> 
>
>
> 
> org.apache.flink
> flink-shaded-hadoop-2-uber
> 2.7.5-10.0
> provided
> 
>
> 
> org.apache.flink
> flink-connector-hive_2.11
> ${flink.version}
> provided
> 
>
> 
> org.apache.hive
> hive-exec
> ${hive.version}
> provided
> 
>
>
> *Error Messge*
>
> Caused by: org.apache.flink.table.api.ValidationException: Cannot discover
> a
> connector using option ''connector'='kafka''.
> at
>
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)
> ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118)
> ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.connectors.hive.HiveDynamicTableFactory.createDynamicTableSource(HiveDynamicTableFactory.java:81)
> ~[flink-connector-hive_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122)
> ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135)
> ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
> ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
> ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
> 

Flink 1.11.2 SQL消费kafka写Hive报错

2021-05-31 文章 Jacob
Dear All,

我在使用Flink SQL消费kafka并写入hive时报错,代码和报错信息如下

其中,Flink 版本为1.11.2,kafka connector使用的是flink-sql-connector-kafka。
搜索了很久,依然没有解决,相同的sql语句在flink sql clients可以执行,但在Java代码中,提交到hadoop集群后就报错如下:

请指教

*Java Code*

TableResult tableResult = tableEnvironment.executeSql("DROP TABLE IF EXISTS
user_behavior_kafka_table");
tableResult.print();
TableResult tableResult2 = tableEnvironment.executeSql(
"CREATE TABLE user_behavior_kafka_table (\r\n" 
+ 
"   `user_id` STRING,\r\n" + 
"   `item_id` STRING\r\n" + 
" ) WITH (\r\n" + 
"   'connector' = 'kafka',\r\n" + 
"   'topic' = 'TestTopic',\r\n" + 
"   'properties.bootstrap.servers' = 
'localhost:9092',\r\n" + 
"   'properties.group.id' = 
'consumerTest',\r\n" + 
"   'scan.startup.mode' = 
'earliest-offset',\r\n" + 
"   'format' = 'json'\r\n" + 
")");
tableResult2.print();


// 数据写入
tableEnvironment.getConfig().setSqlDialect(SqlDialect.HIVE);
tableEnvironment.executeSql(
"INSERT INTO user_behavior_hive_table SELECT user_id,
item_id FROM user_behavior_kafka_table");


*POM File*


org.apache.flink
flink-json
${flink.version}



org.apache.flink
flink-streaming-java_2.11
${flink.version}
provided



org.apache.flink

flink-clients_${scala.binary.version}
${flink.version}
provided



org.apache.flink

flink-table-api-java-bridge_${scala.binary.version}
${flink.version}
provided



org.apache.flink

flink-table-planner-blink_${scala.binary.version}
${flink.version}
provided



org.apache.flink

flink-sql-connector-kafka_${scala.binary.version}
${flink.version}




org.apache.flink
flink-shaded-hadoop-2-uber
2.7.5-10.0
provided



org.apache.flink
flink-connector-hive_2.11
${flink.version}
provided



org.apache.hive
hive-exec
${hive.version}
provided



*Error Messge*

Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a
connector using option ''connector'='kafka''.
at
org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)
~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118)
~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.connectors.hive.HiveDynamicTableFactory.createDynamicTableSource(HiveDynamicTableFactory.java:81)
~[flink-connector-hive_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122)
~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135)
~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
at
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)

Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-05-31 文章 yujianbo
有没有大佬帮忙看看



--
Sent from: http://apache-flink.147419.n8.nabble.com/


datastream union各个topic的数据后,数据有丢失

2021-05-31 文章 13631283359
大家好,
我最近用datastream union各个topic的数据后,数据有丢失,比单独消费每个topic的数据少很多
代码如下:
/**
* 将多个流合并成一个进行处理 * * @param topics 通过配置文件传入的 Topic 和 消费者GroupID * @param env 
Flink运行环境 * @return 将多个Topic数据流整合成一个流 */ def getUnionConsumer(topics: 
List[String], env: StreamExecutionEnvironment): DataStream[String] = { var 
total: DataStream[String] = null for (str <- topics) { val topicName = 
str.split(":")(0) val groupId = str.split(":")(1) val source_data = 
getSourceData(topicName, groupId, env) if (total != null) { total = 
total.union(source_data) } else { total = source_data } } total }

Flink state processor API with Avro data type

2021-05-31 文章 Min Tan
大家好,

我使用 Flink 1.10.1 并尝试使用 Flink State Processor API 从Savepoint读取 flink state 状态。
当状态Type 是普通 Java type或 Java POJOs时, 运行良好。

当 Avro 生成的 Java class 用作状态类型 state type时,不工作。
在这种Avro class情况下是否需要额外的序列化 serializers?

 
谢谢
谭民

Re: 回复:Flink sql的state ttl设置

2021-05-31 文章 LakeShen
或许你可以参考这个:
[image: image.png]

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/config/

Best,
LakeShen

chenchencc <1353637...@qq.com> 于2021年5月28日周五 下午4:30写道:

> 想问下state ttl能针对单表设置吗?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: rocksdb状态后端最多保留checkpoints问题

2021-05-31 文章 LakeShen
在增量 checkpoint 下,你可以简单理解状态几乎都存在 checkpoint 目录中的 shared 目录,
所以即使清理 checkpoint,也只是先将这次 checkpoint 引用的相关文件句柄的引用数减1,
只有一个文件没有 checkpoint 引用它时,才会真正删除该文件。

Best,
LakeShen.

刘建刚  于2021年5月28日周五 下午7:03写道:

> 增量快照的原理是sst文件共享,系统会自动帮助你管理sst文件的引用,类似java的引用,并不会因为一个快照删除了就会把实际的数据删除掉。
> 也就不会发生你说的情况
>
> tison  于2021年5月28日周五 上午1:47写道:
>
> > rocksdb 增量 checkpoint 不是你这么理解的,总的不会恢复不了。原因可以参考下面的材料
> >
> > -
> >
> https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html
> > 官方 blog 介绍
> > - https://www.bilibili.com/video/BV1db411e7x2 施博士的介绍,大概 24 分钟开始讲
> >
> > Best,
> > tison.
> >
> >
> > casel.chen  于2021年5月27日周四 下午11:35写道:
> >
> > > 作业中使用了增量式的rocksdb状态后端,请问如果再设置了最多保留checkpoints个数的话,会不会造成rocksdb
> > >
> >
> state恢复失败?例如,假设完整的state恢复需要最近10个chk,但因为设置了最多保留checkpoints个数为5的话,状态是不是恢复不了了?
> >
>


Re: 流与流 left join

2021-05-31 文章 LakeShen
Hi,或许 Flink SQL  interval join 能够满足你的需求。

Best,
LakeShen.

Shuo Cheng  于2021年5月31日周一 下午12:10写道:

> state ttl 只能是全局算子维度, table.exec.state.ttl
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: 求教:动态字段的处理

2021-05-31 文章 LakeShen
看下你的 Flink 版本是多少,如果是高版本的话,社区有提供 DataStream 的 HBase Sink。

Best,
LakeShen.

Zorro  于2021年5月31日周一 下午2:41写道:

> 由于你的DDL是变化的,无法提前预知所有字段,所以首先可以确定的是这个场景无法使用Flink SQL解决。
>
> 如果使用DataStream解决的话是可行的,唯一可能存在的问题就是目前社区没有提供DataStream的HBase sink。
> 如果你需要在DataStream中使用HBase sink的话,可能需要你自定义一个HBase sink或者基于社区的HBase SQL
> connector做一些更改。不过这些更改是需要在Java代码层面的。
>
> 至于其他的处理逻辑可以用pyFlink很方便的改写。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


不同的程序在同一时间段报同一个异常

2021-05-31 文章 mq sun
大家好:
  最近在生产中,不同项目组的两个flink程序在同一时间段都报下面异常
ERROR org.apache.flink.runtime.blob.BlobServerConnection  -Error while
excuting Blob connection
.
.
.
org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException
:Adjusted frame length exceeds 10485760: 1347375960 -discarded

上面的报错信息,刚开始知道其中一个报错的时候,怀疑是状态过大导致的。但是后来又有另外一个程序报同样的错误,现在不确定是什么问题导致,请问大佬这个可能是什么原因


Re: flink sink kafka from checkpoint run failed

2021-05-31 文章 tianxy
我也遇到了 请问你解决了没




--
Sent from: http://apache-flink.147419.n8.nabble.com/


怎么关闭operatorChaining

2021-05-31 文章 McClone
版本flink 1.11.2
 EnvironmentSettings build = 
EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tEnv = TableEnvironment.create(build);

Re: Re: flink on yarn 模式下,yarn集群的resource-manager切换导致flink应用程序重启,并且未从最后一次checkpoint恢复

2021-05-31 文章 Yang Wang
HA在ZK里面记录了最后一次成功的checkpoint counter和地址,没有启用HA的话,就是从指定的savepoint恢复的。


Best,
Yang

刘建刚  于2021年5月28日周五 下午6:51写道:

> 那应该是master failover后把快照信息丢失了,ha应该能解决这个问题。
>
> 董建 <62...@163.com> 于2021年5月28日周五 下午6:24写道:
>
> > 稳定复现
> > checkpoint 正常生成,在web ui和hdfs目录里边都可以确认。
> > 我们jobmanager没有做ha,不知道是否是这个原因导致的?
> > 日志里边能看到是从指定的-s恢复的,没有指定-s的时候,重启的时候也并没有使用最新的checkpoint文件。
> > 目前这个问题困扰了我很久,也没有一个好的思路,下一步先把ha搞起来再试试。
> > >> org.apache.flink.configuration.GlobalConfiguration   [] -
> > Loading
> > >> configuration property: execution.savepoint.path,
> > >>
> >
> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 在 2021-05-28 18:15:38,"刘建刚"  写道:
> > >这种情况是不符合预期的。请问通过以下步骤可以稳定复现吗?
> > >1、从savepoint恢复;
> > >2、作业开始定期做savepoint;
> > >3、作业failover。
> > >如果是的话,可能需要排查下checkpoint 文件是否存在,zookeeper上是否更新。
> > >如果还是有问题,需要通过日志来排查了。
> > >
> > >董建 <62...@163.com> 于2021年5月28日周五 下午5:37写道:
> > >
> > >> 我遇到的问题现象是这样的
> > >>
> > >>
> > >>
> > >>
> > >> 1、flink版本flink-1.12.2,启动命令如下,指定-s是因为job有做过cancel,这里重启。
> > >>
> > >>
> > >>
> > >>
> > >> flink run -d -s
> > >>
> >
> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100
> > >> -t yarn-per-job -m yarn-cluser -D yarn.application.name=
> > >> /tmp/flink-1.0-SNAPSHOT.jar  -c com.test.myStream  --profile prod
> > >>
> > >>
> > >>
> > >>
> > >> 2、flink-conf.xml
> > >>
> > >>
> > >>
> > >>
> > >> state.checkpoints.dir: hdfs:///user/flink/checkpoints/default
> > >>
> > >>
> > >>
> > >>
> > >> 3、代码checkpoint设置
> > >>
> > >>
> > >>
> > >>
> > >>StreamExecutionEnvironment env =
> > >> StreamExecutionEnvironment.getExecutionEnvironment();
> > >>
> > >>
> > >>
> > >>
> > >>env.setRestartStrategy(RestartStrategies.fixedDelayRestart(100,
> > >> 10));
> > >>
> > >>
> > >>
> > >>
> > >>CheckpointConfig checkpointConfig = env.getCheckpointConfig();
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> >
> checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> > >>
> > >>
> > >>
> > >>
> > >>env.enableCheckpointing(1 * 60 * 1000);
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> > >>
> > >>
> > >>
> > >>
> > >>checkpointConfig.setTolerableCheckpointFailureNumber(100);
> > >>
> > >>
> > >>
> > >>
> > >>checkpointConfig.setCheckpointTimeout(60 * 1000);
> > >>
> > >>
> > >>
> > >>
> > >>checkpointConfig.setMaxConcurrentCheckpoints(1);
> > >>
> > >>
> > >>
> > >>
> > >> 4、问题现象
> > >>
> > >>
> > >>
> > >>
> > >> a)运维同事切换yarn
> > >>
> >
> resourc-manager,我的flink任务也会重启(重启后application-id和job-id并没有改变),但是jobmanager和taskmanager更换了机器
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> b)我的flink任务每隔1分钟做一个checkpoint,假如任务在上次重启后又运行了一段时间,checkpoint已经到了chk-200
> > >>
> > >>
> > >>
> > >>
> > >> c)集群的同事再次切换resource-manager,这个时候预期是flink任务自动从chk-200
> > >> restore,从日志中看还是从chk-100 restore的。
> > >>
> > >>
> > >>
> > >>
> > >> d)任务大致逻辑是通过flink-mysql-cdc消费binlog,DebeziumSourceFunction
> > >> sourceMilApplysLogStream = MySQLSource.builder()
> > >>
> > >>
> > >>
> > >>
> > >>   重启导致了再次从chk-100的binlog pos开始消费,即了chk-100~chk-200之间的binlog重复消费
> > >>
> > >>
> > >>
> > >>
> > >> e)日志中有打印如下,难道是因为上次我启动flink任务的时候用了-s,所以不会去自动找最新的checkpoint重启吗?
> > >>
> > >>
> > >>
> > >>
> > >> 2021-05-24 16:49:50,398 INFO
> > >> org.apache.flink.configuration.GlobalConfiguration   [] -
> > Loading
> > >> configuration property: execution.savepoint.path,
> > >>
> >
> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100
> > >>
> > >>
> > >>
> > >> 预期是任务重启后自动从最新的checkpoint开始继续消费,即从chk-200开始消费
> > >>
> > >>
> > >>
> > >>
> > >> 现在的问题是任务重启后总是从flink -s指定的checkpoint恢复的。
> >
>


Re: flink 1.13 k8s native 启动找不到 KubernetesSessionClusterEntrypoint

2021-05-31 文章 Yang Wang
你可以describe一下失败JM的pod发出来,看看生成的启动命令是不是正确的


Best,
Yang

fz  于2021年5月28日周五 下午10:09写道:

> 镜像: flink:1.13.0-scala_2.11
>
> sed: cannot rename /opt/flink/conf/sed1yRdDY: Device or resource busy
> sed: cannot rename /opt/flink/conf/sed03zP3W: Device or resource busy
> /docker-entrypoint.sh: line 73: /opt/flink/conf/flink-conf.yaml: Read-only
> file system
> sed: cannot rename /opt/flink/conf/sedFtORA0: Device or resource busy
> mv: cannot move '/opt/flink/conf/flink-conf.yaml.tmp' to
> '/opt/flink/conf/flink-conf.yaml': Device or resource busy
> + /usr/local/openjdk-8/bin/java -classpath '/opt/flink/lib/*' -Xms30720m
> -Xmx30720m -Dlogback.configurationFile=file:/opt/flink/conf/logback.xml
> -Dlog4j.configuration=file:/opt/flink/conf/log4j.properties
> org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint
> Error: Could not find or load main class
> org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-05-31 文章 yujianbo
一、环境:
1、版本:1.12.0
2、flink sql
3、已经设置了setIdleStateRetention 为1小时
4、状态后端是rocksDB, 增量模式
5、源数据没有数据激增情况,任务已经跑了两天

二、详情
具体sql见第三大点,就是普通的group by统计的
sql,然后设置setIdleStateRetention(3600)。目前观察两天了,checkpoint目录下面的shared文件夹的大小一直在增长,然后看文件夹里的文件是在一直更新,最早的文件也会消失。
   
我sql的groupby维度有加一个具体的分钟字段,所以一小时之后是不可能有一模一样的维度数据,那过期的数据正常是要被清理掉,那/checkpoint/shared/文件夹大小不断增长是否能说明过期的旧数据还没有被清理?
这种情况应该怎么处理

三、sql具体

CREATE TABLE user_behavior (
   `request_ip` STRING,
   `request_time` BIGINT,
   `header` STRING ,
//这个操作是将时间戳转为分钟
   `t_min` as cast(`request_time`-(`request_time` + 2880)%6 as
BIGINT),
   `ts` as TO_TIMESTAMP(FROM_UNIXTIME(`request_time`/1000-28800,'-MM-dd
HH:mm:ss')),
   WATERMARK FOR `ts` AS `ts` - INTERVAL '60' MINUTE) 
with (
   'connector' = 'kafka',
    
);


CREATE TABLE blackhole_table (
   `cnt` BIGINT,
   `lists` STRING
) WITH (
 'connector' = 'blackhole'
);


insert into blackhole_table 
select 
count(*) as cnt, 
LISTAGG(concat(`request_ip`, `header`, cast(`request_time` as STRING)))
as lists
from user_behavior 
group by `request_ip`,`header`,`t_min`;





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 求教:动态字段的处理

2021-05-31 文章 Zorro
由于你的DDL是变化的,无法提前预知所有字段,所以首先可以确定的是这个场景无法使用Flink SQL解决。

如果使用DataStream解决的话是可行的,唯一可能存在的问题就是目前社区没有提供DataStream的HBase sink。
如果你需要在DataStream中使用HBase sink的话,可能需要你自定义一个HBase sink或者基于社区的HBase SQL
connector做一些更改。不过这些更改是需要在Java代码层面的。

至于其他的处理逻辑可以用pyFlink很方便的改写。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

??????flink job exception

2021-05-31 文章 day
 history server??

https://ci.apache.org/projects/flink/flink-docs-master/zh/docs/deployment/advanced/historyserver/




----
??: 
   "user-zh"