Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?
我遇到过的问题就是开了 增量checkpoint后,checkpoint会越来越大,关闭之后checkpoint就正常了 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Flink SQL 1.11.3问题请教
我们使用Nifi将数据采集到kafka,然后使用Flink处理kafka中的数据,现在Nifi如果将多条数据当做一条记录(写入kafka中的数据格式为csv或者json)写入kafka后(就是一个offset中包含了多条数据),Flink只会处理其中的一条数据?有没有什么办法让Flink都处理一个offset中的多条数据? yinghua...@163.com
Re: Flink 1.11.2 SQL消费kafka写Hive报错
问题已解决 需要在FLink home的lib中引入kafka connector jar包 - Thanks! Jacob -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?
感谢大佬的回复!以后优先现在邮箱这边讨论发现问题再去提个issue! 我的 idleStateRetention确实是设置3600秒,我先进行测试看看。 -- Sent from: http://apache-flink.147419.n8.nabble.com/
flink sql cdc 采集mysql binlog 可以保留before,after的字段吗
flink sql cdc 采集mysql binlog 可以保留before,after的字段吗? 按照官方的例子,定义表结构后,是最新的字段值? 能否同时保留before和after?
Re:datastream union各个topic的数据后,数据有丢失
已经解决了,去掉循环,把每个kafka topic单独处理,再union 在 2021-06-01 08:54:42,"13631283359" <13631283...@163.com> 写道: 大家好, 我最近用datastream union各个topic的数据后,数据有丢失,比单独消费每个topic的数据少很多 代码如下: /** * 将多个流合并成一个进行处理 * * @param topics 通过配置文件传入的 Topic 和 消费者GroupID * @param env Flink运行环境 * @return 将多个Topic数据流整合成一个流 */ def getUnionConsumer(topics: List[String], env: StreamExecutionEnvironment): DataStream[String] = { var total: DataStream[String] = null for (str <- topics) { val topicName = str.split(":")(0) val groupId = str.split(":")(1) val source_data = getSourceData(topicName, groupId, env) if (total != null) { total = total.union(source_data) } else { total = source_data } } total }
Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?
Hi, 先确定一下,你的 idleStateRetention 是 3600秒?其次,要想看是否所有数据均有用,可以利用 Checkpoints.loadCheckpointMeta [1] 去加载你所保留的checkpoint目录下的 _metadata 文件,然后与当前checkpoint目录下的文件作对比,看是否存在大量的未删除旧文件。 目前仅凭你的描述和一段SQL代码其实很难判断。 可能存在的原因有: 1. 单次checkpoint文件数目过多,JM单点删除跟不上相关速度 2. 整体checkpoint size不大,RocksDB的compaction频率较低,导致相关包含过期数据的旧文件并没有被及时删除,仍然存在checkpoint目录下(保留多个checkpoint同样会放大该问题) 所以还是最好先分析已有checkpoint的meta文件,再与目前的实际情况做对比。 另外,Flink JIRA其实不是一个很好的直接问问题的地方,尤其是这种并不明确的问题,更妥善的方式还是先在邮件列表讨论,具体定位到原因后,再创建相关详细的ticket。 [1] https://github.com/apache/flink/blob/b582991b8b2b8dadb89e71d5002c4a9cc2055e34/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L99 祝好 唐云 From: yujianbo <15205029...@163.com> Sent: Tuesday, June 1, 2021 10:51 To: user-zh@flink.apache.org Subject: Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制? 没有更好的方式吗 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Flink 1.11.2 SQL消费kafka写Hive报错
Thank you for your reply! 您所说的kafka connector 是*flink-connector-kafka_2.11* 这个依赖吗?这个是Datastream所用的依赖,我在pom中已经引入了* flink-sql-connector-kafka_2.11*依赖了。 我试了引入* flink-connector-kafka_2.11*,但还是会报错的。 - Thanks! Jacob -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Flink 1.11.2 SQL消费kafka写Hive报错
Thank you for your reply! 您所说的kafka connector 是* flink-connector-kafka_2.11* 这个依赖吗?这个是Datastream所用的依赖,我在pom中已经引入了 *flink-sql-connector-kafka_2.11*依赖了。 我试了引入* flink-connector-kafka_2.11*,但还是会报错的。 - Thanks! Jacob -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?
没有更好的方式吗,这样治标不治本,那我大状态任务会有很多问题 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?
没有更好的方式吗 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?
关闭 增量checkpoint -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Flink 1.11.2 SQL消费kafka写Hive报错
Hi Jacob, Maybe you miss the kafka connector dependency in your pom, you could refer to this url : https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html Best, LakeShen Jacob <17691150...@163.com> 于2021年6月1日周二 上午9:54写道: > Dear All, > > 我在使用Flink SQL消费kafka并写入hive时报错,代码和报错信息如下 > > 其中,Flink 版本为1.11.2,kafka connector使用的是flink-sql-connector-kafka。 > 搜索了很久,依然没有解决,相同的sql语句在flink sql clients可以执行,但在Java代码中,提交到hadoop集群后就报错如下: > > 请指教 > > *Java Code* > > TableResult tableResult = tableEnvironment.executeSql("DROP TABLE IF EXISTS > user_behavior_kafka_table"); > tableResult.print(); > TableResult tableResult2 = tableEnvironment.executeSql( > "CREATE TABLE user_behavior_kafka_table > (\r\n" + > " `user_id` STRING,\r\n" + > " `item_id` STRING\r\n" + > " ) WITH (\r\n" + > " 'connector' = 'kafka',\r\n" + > " 'topic' = 'TestTopic',\r\n" + > " 'properties.bootstrap.servers' = > 'localhost:9092',\r\n" + > " 'properties.group.id' = > 'consumerTest',\r\n" + > " 'scan.startup.mode' = > 'earliest-offset',\r\n" + > " 'format' = 'json'\r\n" + > ")"); > tableResult2.print(); > > > // 数据写入 > tableEnvironment.getConfig().setSqlDialect(SqlDialect.HIVE); > tableEnvironment.executeSql( > "INSERT INTO user_behavior_hive_table SELECT user_id, > item_id FROM user_behavior_kafka_table"); > > > *POM File* > > > org.apache.flink > flink-json > ${flink.version} > > > > org.apache.flink > flink-streaming-java_2.11 > ${flink.version} > provided > > > > org.apache.flink > > flink-clients_${scala.binary.version} > ${flink.version} > provided > > > > org.apache.flink > > > flink-table-api-java-bridge_${scala.binary.version} > ${flink.version} > provided > > > > org.apache.flink > > flink-table-planner-blink_${scala.binary.version} > ${flink.version} > provided > > > > org.apache.flink > > flink-sql-connector-kafka_${scala.binary.version} > ${flink.version} > > > > > org.apache.flink > flink-shaded-hadoop-2-uber > 2.7.5-10.0 > provided > > > > org.apache.flink > flink-connector-hive_2.11 > ${flink.version} > provided > > > > org.apache.hive > hive-exec > ${hive.version} > provided > > > > *Error Messge* > > Caused by: org.apache.flink.table.api.ValidationException: Cannot discover > a > connector using option ''connector'='kafka''. > at > > org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329) > ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] > at > > org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118) > ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] > at > > org.apache.flink.connectors.hive.HiveDynamicTableFactory.createDynamicTableSource(HiveDynamicTableFactory.java:81) > ~[flink-connector-hive_2.11-1.11.2.jar:1.11.2] > at > > org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122) > ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] > at > > org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135) > ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] > at > > org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78) > ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] > at > > org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492) > ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] > at > > org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415) >
Flink 1.11.2 SQL消费kafka写Hive报错
Dear All, 我在使用Flink SQL消费kafka并写入hive时报错,代码和报错信息如下 其中,Flink 版本为1.11.2,kafka connector使用的是flink-sql-connector-kafka。 搜索了很久,依然没有解决,相同的sql语句在flink sql clients可以执行,但在Java代码中,提交到hadoop集群后就报错如下: 请指教 *Java Code* TableResult tableResult = tableEnvironment.executeSql("DROP TABLE IF EXISTS user_behavior_kafka_table"); tableResult.print(); TableResult tableResult2 = tableEnvironment.executeSql( "CREATE TABLE user_behavior_kafka_table (\r\n" + " `user_id` STRING,\r\n" + " `item_id` STRING\r\n" + " ) WITH (\r\n" + " 'connector' = 'kafka',\r\n" + " 'topic' = 'TestTopic',\r\n" + " 'properties.bootstrap.servers' = 'localhost:9092',\r\n" + " 'properties.group.id' = 'consumerTest',\r\n" + " 'scan.startup.mode' = 'earliest-offset',\r\n" + " 'format' = 'json'\r\n" + ")"); tableResult2.print(); // 数据写入 tableEnvironment.getConfig().setSqlDialect(SqlDialect.HIVE); tableEnvironment.executeSql( "INSERT INTO user_behavior_hive_table SELECT user_id, item_id FROM user_behavior_kafka_table"); *POM File* org.apache.flink flink-json ${flink.version} org.apache.flink flink-streaming-java_2.11 ${flink.version} provided org.apache.flink flink-clients_${scala.binary.version} ${flink.version} provided org.apache.flink flink-table-api-java-bridge_${scala.binary.version} ${flink.version} provided org.apache.flink flink-table-planner-blink_${scala.binary.version} ${flink.version} provided org.apache.flink flink-sql-connector-kafka_${scala.binary.version} ${flink.version} org.apache.flink flink-shaded-hadoop-2-uber 2.7.5-10.0 provided org.apache.flink flink-connector-hive_2.11 ${flink.version} provided org.apache.hive hive-exec ${hive.version} provided *Error Messge* Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option ''connector'='kafka''. at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.flink.connectors.hive.HiveDynamicTableFactory.createDynamicTableSource(HiveDynamicTableFactory.java:81) ~[flink-connector-hive_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?
有没有大佬帮忙看看 -- Sent from: http://apache-flink.147419.n8.nabble.com/
datastream union各个topic的数据后,数据有丢失
大家好, 我最近用datastream union各个topic的数据后,数据有丢失,比单独消费每个topic的数据少很多 代码如下: /** * 将多个流合并成一个进行处理 * * @param topics 通过配置文件传入的 Topic 和 消费者GroupID * @param env Flink运行环境 * @return 将多个Topic数据流整合成一个流 */ def getUnionConsumer(topics: List[String], env: StreamExecutionEnvironment): DataStream[String] = { var total: DataStream[String] = null for (str <- topics) { val topicName = str.split(":")(0) val groupId = str.split(":")(1) val source_data = getSourceData(topicName, groupId, env) if (total != null) { total = total.union(source_data) } else { total = source_data } } total }
Flink state processor API with Avro data type
大家好, 我使用 Flink 1.10.1 并尝试使用 Flink State Processor API 从Savepoint读取 flink state 状态。 当状态Type 是普通 Java type或 Java POJOs时, 运行良好。 当 Avro 生成的 Java class 用作状态类型 state type时,不工作。 在这种Avro class情况下是否需要额外的序列化 serializers? 谢谢 谭民
Re: 回复:Flink sql的state ttl设置
或许你可以参考这个: [image: image.png] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/config/ Best, LakeShen chenchencc <1353637...@qq.com> 于2021年5月28日周五 下午4:30写道: > 想问下state ttl能针对单表设置吗? > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >
Re: rocksdb状态后端最多保留checkpoints问题
在增量 checkpoint 下,你可以简单理解状态几乎都存在 checkpoint 目录中的 shared 目录, 所以即使清理 checkpoint,也只是先将这次 checkpoint 引用的相关文件句柄的引用数减1, 只有一个文件没有 checkpoint 引用它时,才会真正删除该文件。 Best, LakeShen. 刘建刚 于2021年5月28日周五 下午7:03写道: > 增量快照的原理是sst文件共享,系统会自动帮助你管理sst文件的引用,类似java的引用,并不会因为一个快照删除了就会把实际的数据删除掉。 > 也就不会发生你说的情况 > > tison 于2021年5月28日周五 上午1:47写道: > > > rocksdb 增量 checkpoint 不是你这么理解的,总的不会恢复不了。原因可以参考下面的材料 > > > > - > > > https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html > > 官方 blog 介绍 > > - https://www.bilibili.com/video/BV1db411e7x2 施博士的介绍,大概 24 分钟开始讲 > > > > Best, > > tison. > > > > > > casel.chen 于2021年5月27日周四 下午11:35写道: > > > > > 作业中使用了增量式的rocksdb状态后端,请问如果再设置了最多保留checkpoints个数的话,会不会造成rocksdb > > > > > > state恢复失败?例如,假设完整的state恢复需要最近10个chk,但因为设置了最多保留checkpoints个数为5的话,状态是不是恢复不了了? > > >
Re: 流与流 left join
Hi,或许 Flink SQL interval join 能够满足你的需求。 Best, LakeShen. Shuo Cheng 于2021年5月31日周一 下午12:10写道: > state ttl 只能是全局算子维度, table.exec.state.ttl > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >
Re: 求教:动态字段的处理
看下你的 Flink 版本是多少,如果是高版本的话,社区有提供 DataStream 的 HBase Sink。 Best, LakeShen. Zorro 于2021年5月31日周一 下午2:41写道: > 由于你的DDL是变化的,无法提前预知所有字段,所以首先可以确定的是这个场景无法使用Flink SQL解决。 > > 如果使用DataStream解决的话是可行的,唯一可能存在的问题就是目前社区没有提供DataStream的HBase sink。 > 如果你需要在DataStream中使用HBase sink的话,可能需要你自定义一个HBase sink或者基于社区的HBase SQL > connector做一些更改。不过这些更改是需要在Java代码层面的。 > > 至于其他的处理逻辑可以用pyFlink很方便的改写。 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/
不同的程序在同一时间段报同一个异常
大家好: 最近在生产中,不同项目组的两个flink程序在同一时间段都报下面异常 ERROR org.apache.flink.runtime.blob.BlobServerConnection -Error while excuting Blob connection . . . org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException :Adjusted frame length exceeds 10485760: 1347375960 -discarded 上面的报错信息,刚开始知道其中一个报错的时候,怀疑是状态过大导致的。但是后来又有另外一个程序报同样的错误,现在不确定是什么问题导致,请问大佬这个可能是什么原因
Re: flink sink kafka from checkpoint run failed
我也遇到了 请问你解决了没 -- Sent from: http://apache-flink.147419.n8.nabble.com/
怎么关闭operatorChaining
版本flink 1.11.2 EnvironmentSettings build = EnvironmentSettings.newInstance().inBatchMode().build(); TableEnvironment tEnv = TableEnvironment.create(build);
Re: Re: flink on yarn 模式下,yarn集群的resource-manager切换导致flink应用程序重启,并且未从最后一次checkpoint恢复
HA在ZK里面记录了最后一次成功的checkpoint counter和地址,没有启用HA的话,就是从指定的savepoint恢复的。 Best, Yang 刘建刚 于2021年5月28日周五 下午6:51写道: > 那应该是master failover后把快照信息丢失了,ha应该能解决这个问题。 > > 董建 <62...@163.com> 于2021年5月28日周五 下午6:24写道: > > > 稳定复现 > > checkpoint 正常生成,在web ui和hdfs目录里边都可以确认。 > > 我们jobmanager没有做ha,不知道是否是这个原因导致的? > > 日志里边能看到是从指定的-s恢复的,没有指定-s的时候,重启的时候也并没有使用最新的checkpoint文件。 > > 目前这个问题困扰了我很久,也没有一个好的思路,下一步先把ha搞起来再试试。 > > >> org.apache.flink.configuration.GlobalConfiguration [] - > > Loading > > >> configuration property: execution.savepoint.path, > > >> > > > hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 在 2021-05-28 18:15:38,"刘建刚" 写道: > > >这种情况是不符合预期的。请问通过以下步骤可以稳定复现吗? > > >1、从savepoint恢复; > > >2、作业开始定期做savepoint; > > >3、作业failover。 > > >如果是的话,可能需要排查下checkpoint 文件是否存在,zookeeper上是否更新。 > > >如果还是有问题,需要通过日志来排查了。 > > > > > >董建 <62...@163.com> 于2021年5月28日周五 下午5:37写道: > > > > > >> 我遇到的问题现象是这样的 > > >> > > >> > > >> > > >> > > >> 1、flink版本flink-1.12.2,启动命令如下,指定-s是因为job有做过cancel,这里重启。 > > >> > > >> > > >> > > >> > > >> flink run -d -s > > >> > > > hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100 > > >> -t yarn-per-job -m yarn-cluser -D yarn.application.name= > > >> /tmp/flink-1.0-SNAPSHOT.jar -c com.test.myStream --profile prod > > >> > > >> > > >> > > >> > > >> 2、flink-conf.xml > > >> > > >> > > >> > > >> > > >> state.checkpoints.dir: hdfs:///user/flink/checkpoints/default > > >> > > >> > > >> > > >> > > >> 3、代码checkpoint设置 > > >> > > >> > > >> > > >> > > >>StreamExecutionEnvironment env = > > >> StreamExecutionEnvironment.getExecutionEnvironment(); > > >> > > >> > > >> > > >> > > >>env.setRestartStrategy(RestartStrategies.fixedDelayRestart(100, > > >> 10)); > > >> > > >> > > >> > > >> > > >>CheckpointConfig checkpointConfig = env.getCheckpointConfig(); > > >> > > >> > > >> > > >> > > >> > > >> > > > checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > > >> > > >> > > >> > > >> > > >>env.enableCheckpointing(1 * 60 * 1000); > > >> > > >> > > >> > > >> > > >> > > >> > checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > > >> > > >> > > >> > > >> > > >>checkpointConfig.setTolerableCheckpointFailureNumber(100); > > >> > > >> > > >> > > >> > > >>checkpointConfig.setCheckpointTimeout(60 * 1000); > > >> > > >> > > >> > > >> > > >>checkpointConfig.setMaxConcurrentCheckpoints(1); > > >> > > >> > > >> > > >> > > >> 4、问题现象 > > >> > > >> > > >> > > >> > > >> a)运维同事切换yarn > > >> > > > resourc-manager,我的flink任务也会重启(重启后application-id和job-id并没有改变),但是jobmanager和taskmanager更换了机器 > > >> > > >> > > >> > > >> > > >> > > >> b)我的flink任务每隔1分钟做一个checkpoint,假如任务在上次重启后又运行了一段时间,checkpoint已经到了chk-200 > > >> > > >> > > >> > > >> > > >> c)集群的同事再次切换resource-manager,这个时候预期是flink任务自动从chk-200 > > >> restore,从日志中看还是从chk-100 restore的。 > > >> > > >> > > >> > > >> > > >> d)任务大致逻辑是通过flink-mysql-cdc消费binlog,DebeziumSourceFunction > > >> sourceMilApplysLogStream = MySQLSource.builder() > > >> > > >> > > >> > > >> > > >> 重启导致了再次从chk-100的binlog pos开始消费,即了chk-100~chk-200之间的binlog重复消费 > > >> > > >> > > >> > > >> > > >> e)日志中有打印如下,难道是因为上次我启动flink任务的时候用了-s,所以不会去自动找最新的checkpoint重启吗? > > >> > > >> > > >> > > >> > > >> 2021-05-24 16:49:50,398 INFO > > >> org.apache.flink.configuration.GlobalConfiguration [] - > > Loading > > >> configuration property: execution.savepoint.path, > > >> > > > hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100 > > >> > > >> > > >> > > >> 预期是任务重启后自动从最新的checkpoint开始继续消费,即从chk-200开始消费 > > >> > > >> > > >> > > >> > > >> 现在的问题是任务重启后总是从flink -s指定的checkpoint恢复的。 > > >
Re: flink 1.13 k8s native 启动找不到 KubernetesSessionClusterEntrypoint
你可以describe一下失败JM的pod发出来,看看生成的启动命令是不是正确的 Best, Yang fz 于2021年5月28日周五 下午10:09写道: > 镜像: flink:1.13.0-scala_2.11 > > sed: cannot rename /opt/flink/conf/sed1yRdDY: Device or resource busy > sed: cannot rename /opt/flink/conf/sed03zP3W: Device or resource busy > /docker-entrypoint.sh: line 73: /opt/flink/conf/flink-conf.yaml: Read-only > file system > sed: cannot rename /opt/flink/conf/sedFtORA0: Device or resource busy > mv: cannot move '/opt/flink/conf/flink-conf.yaml.tmp' to > '/opt/flink/conf/flink-conf.yaml': Device or resource busy > + /usr/local/openjdk-8/bin/java -classpath '/opt/flink/lib/*' -Xms30720m > -Xmx30720m -Dlogback.configurationFile=file:/opt/flink/conf/logback.xml > -Dlog4j.configuration=file:/opt/flink/conf/log4j.properties > org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint > Error: Could not find or load main class > org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >
Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?
一、环境: 1、版本:1.12.0 2、flink sql 3、已经设置了setIdleStateRetention 为1小时 4、状态后端是rocksDB, 增量模式 5、源数据没有数据激增情况,任务已经跑了两天 二、详情 具体sql见第三大点,就是普通的group by统计的 sql,然后设置setIdleStateRetention(3600)。目前观察两天了,checkpoint目录下面的shared文件夹的大小一直在增长,然后看文件夹里的文件是在一直更新,最早的文件也会消失。 我sql的groupby维度有加一个具体的分钟字段,所以一小时之后是不可能有一模一样的维度数据,那过期的数据正常是要被清理掉,那/checkpoint/shared/文件夹大小不断增长是否能说明过期的旧数据还没有被清理? 这种情况应该怎么处理 三、sql具体 CREATE TABLE user_behavior ( `request_ip` STRING, `request_time` BIGINT, `header` STRING , //这个操作是将时间戳转为分钟 `t_min` as cast(`request_time`-(`request_time` + 2880)%6 as BIGINT), `ts` as TO_TIMESTAMP(FROM_UNIXTIME(`request_time`/1000-28800,'-MM-dd HH:mm:ss')), WATERMARK FOR `ts` AS `ts` - INTERVAL '60' MINUTE) with ( 'connector' = 'kafka', ); CREATE TABLE blackhole_table ( `cnt` BIGINT, `lists` STRING ) WITH ( 'connector' = 'blackhole' ); insert into blackhole_table select count(*) as cnt, LISTAGG(concat(`request_ip`, `header`, cast(`request_time` as STRING))) as lists from user_behavior group by `request_ip`,`header`,`t_min`; -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 求教:动态字段的处理
由于你的DDL是变化的,无法提前预知所有字段,所以首先可以确定的是这个场景无法使用Flink SQL解决。 如果使用DataStream解决的话是可行的,唯一可能存在的问题就是目前社区没有提供DataStream的HBase sink。 如果你需要在DataStream中使用HBase sink的话,可能需要你自定义一个HBase sink或者基于社区的HBase SQL connector做一些更改。不过这些更改是需要在Java代码层面的。 至于其他的处理逻辑可以用pyFlink很方便的改写。 -- Sent from: http://apache-flink.147419.n8.nabble.com/
??????flink job exception
history server?? https://ci.apache.org/projects/flink/flink-docs-master/zh/docs/deployment/advanced/historyserver/ ---- ??: "user-zh"