Re: Flink connector 是否支持忽略delete message
Hi, shi franke. 你可以尝试自己实现一个 DynamicTableSink,在里面添加参数 “sink.ignore-delete”。 你可以参考 github 上的一些实现,例如 clickhouse: https://github.com/liekkassmile/flink-connector-clickhouse-1.13 shi franke 于2023年7月7日周五 19:24写道: > > 感谢您的回复,这样自定义是可以实现的,我们目前使用的是1.15的flink版本。想看一下社区是不是有在框架层面实现这个配置的支持,理解这应该也是一个相对common的配置 > junjie.m...@goupwith.com 于2023年7月7日周五 17:57写道: > > > 可以自己用DataStream API通过RowKind进行过滤。 > > 如下示例代码:import org.apache.flink.api.common.functions.RichFlatMapFunction; > > import org.apache.flink.types.Row; > > import org.apache.flink.types.RowKind; > > import org.apache.flink.util.Collector; > > > > /** > > * 增量数据过滤函数 > > */ > > public class AppendOnlyFilterFunction extends RichFlatMapFunction > Row> { > > > > private boolean includedUpdateAfter = false; > > > > public AppendOnlyFilterFunction() { > > } > > > > public AppendOnlyFilterFunction(boolean includedUpdateAfter) { > > this.includedUpdateAfter = includedUpdateAfter; > > } > > > > @Override > > public void flatMap(Row row, Collector collector) throws > > Exception { > > if (RowKind.INSERT == row.getKind()) { > > collector.collect(row); > > } else if (includedUpdateAfter && RowKind.UPDATE_AFTER == > > row.getKind()) { > > row.setKind(RowKind.INSERT); > > collector.collect(row); > > } > > } > > > > } > > > > 发件人: shi franke > > 发送时间: 2023-07-07 17:33 > > 收件人: user-zh > > 主题: Flink connector 是否支持忽略delete message > > 咨询下各位大佬,请问下connector现在有支持忽略delete消息的选项配置 > > > > 吗?比如上游的数据是一个upsert/retract流,在connector这里是否有选项去忽略delete > > message,当作append流只去戳里insert消息。我看现在代码没有类似的功能,不确定是否有相关的jira或者实现 > > >
Re: flink1.17.1使用kafka source异常
Hi, aiden. 看起来是类冲突,按照官方的文档,使用 kafka 时,你应该是不需要引入 flink-core 和 flink-connector-base 的( https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/kafka/)。如果是因为其他原因需要使用这两个 jar, 你可以使用 mvn dependency::tree 查看一下 "org/apache/kafka/clients/consumer/ConsumerRecord" 是在哪里被重复加载进来,可以exclude 掉非 flink-connector-kafka 的这个类。 aiden <18765295...@163.com> 于2023年7月4日周二 14:23写道: > hi > > 在使用1.17.1版本kafka source时遇到如下异常: > Caused by: java.lang.LinkageError: loader constraint violation: loader > (instance of org/apache/flink/util/ChildFirstClassLoader) previously > initiated loading for a different type with name > "org/apache/kafka/clients/consumer/ConsumerRecord" > at java.lang.ClassLoader.defineClass1(Native Method) > at java.lang.ClassLoader.defineClass(ClassLoader.java:763) > at > java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) > at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) > at java.net.URLClassLoader.access$100(URLClassLoader.java:73) > at java.net.URLClassLoader$1.run(URLClassLoader.java:368) > at java.net.URLClassLoader$1.run(URLClassLoader.java:362) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:361) > at > org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:71) > at > org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at java.lang.Class.getDeclaredMethods0(Native Method) > at java.lang.Class.privateGetDeclaredMethods(Class.java:2701) > at java.lang.Class.getDeclaredMethod(Class.java:2128) > at > java.io.ObjectStreamClass.getPrivateMethod(ObjectStreamClass.java:1629) > at java.io.ObjectStreamClass.access$1700(ObjectStreamClass.java:79) > at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:520) > at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:494) > at java.security.AccessController.doPrivileged(Native Method) > at java.io.ObjectStreamClass.(ObjectStreamClass.java:494) > at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391) > at > java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:681) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885) > at > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:534) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:522) > at > org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:67) > at > org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:471) > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286) > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223) > ... 20 more > 以下是我的部分POM > org.apache.flink > flink-core > 1.17.1 > > > org.apache.flink > flink-connector-kafka > 1.17.1 > > > org.apache.flink > flink-connector-base > 1.17.1 > > > > 看起来像是类加载器异常,需要我修改哪些地方吗 >
Re: Flink 1.16 流表 join 的 FilterPushDown 及并行
Hi, Chai Kelun, 你的 filter condition 里面包含了你自定义的 UDF,是不满足 filter push down 的条件的,因为对于优化器来说 UDF 是不确定的,优化器不能从里面提取到可以下推的条件, 如果你想实现下推,可以尝试抽取下确定性的 condition,如 product.id > 10 etc.。另外,Flink 是支持 broadcast hash join 的,如果你想控制某两个表的 join type,你可以通过 join hint 来指定 join 类型为 broadcast。() Chai Kelun 于2023年7月3日周一 17:58写道: > 有一张 kafka 流表 logClient(id int, name string, price double),一张实现了 > SupportsFilterPushDown 的 customConnector 维表 product(id int, name string, > value double),实现了自定义函数 MyUDF(double d1, double d2) 用于自定义逻辑计算并支持该算子的下推。 > 在 Stream-Table Join 的场景下,下列 SQL 并没有将算子进行下推,而是通过 TableScan 将所有算子提到 Join > 节点进行计算,请问是否有什么选项可以开启下推?(类似与 nestedloop-join,计算推到 product 表数据源进行) > SELECT A.id, A.name, B.name FROM logClient AS A, product AS B WHERE > MyUDF(B.value, A.price) < xxx; > 另外,Kafka 和 customConnector 均支持并行,在 Join 计算时期望使用 BROADCAST 模式,将 product 表在 > logClient 流表的每个 partition 上进行计算,但似乎目前 Flink 流-表 Join 的 distribution 模式仅支持 > SINGLETON 和 HASH[KEY](StreamExecExchange.java Line106 的 switch > case),后续社区是否会考虑支持更多的 distributionType? > > 非常感谢!
Re: flink cdc能否同步DDL语句?
目前,社区的 Flink CDC,是可以读取到 binlog 中的 DDL 语句的,但是不会传递给下游,即下游无法感知 schema 的变化。 Xuyang 于2022年10月10日周一 16:46写道: > Hi, 目前应该是不行的 > 在 2022-09-26 23:27:05,"casel.chen" 写道: > >flink cdc能否同步DDL语句以实现schema同步? 例如create table , create index, truncate > table等 >
Re: flink 的数据传输,是上游算子推给下游, 还是下游算子拉取上游, 设计的考虑是啥?
你好。 Flink 采用的是 pull 模型。pull 模型的优点在于:1. 其具有更好的扩展性(下游的消费者可以根据需求增加,只需要获取到上游的消费位点); 2. 下游的消费者可以根据需求来调整消费速率; 3.网络传输,flink 以前也尝试使用过push模型,且为了节约开销,进程间是复用 TCP连接,一个 task 线程的性能瓶颈将导致整条链路的所有 task 线程不能接收数据,影响整体的数据消费速率。 push模型的优点:消耗较小,不需要设计机制来一直轮训观察上游节点的数据情况。 Xuyang 于2022年9月9日周五 20:35写道: > Hi,主要是pull模型:下游主动拉取上游的数据。可以在下游的消费能力达到极限时,通过反压机制,让上游减少生产的数据。 > > > > > > > > -- > > Best! > Xuyang > > > > > > 在 2022-09-09 19:04:27,"郑 致远" 写道: > >各位大佬好 > >请教下, > >flink 的数据传输,是上游算子推给下游, 还是下游算子拉取上游, 这么设计的考虑是啥呢? >
Re: flink hybrid source问题
你好,hybrid source 现在需要基于 FLIP-27 source 来实现(如:FileSource, KafkaSource),对于非 FLIP-27 source 需要做一些修改后才可以使用。如果想参与 hybird source 的扩展,可以在 slack 中加入flink社群,并发起讨论。 关于 source 相关的文档,可以查看官网和 flip 设计和讨论页面( https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source )( https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface )。希望能帮到你。 casel.chen 于2022年9月19日周一 19:42写道: > 我有一个flink实时计算场景是需要先从MaxCompute读取一张表的存量数据,再从相应的kafka topic读取增量数据,一并进行计算处理。 > 看了一下需要用到hybrid source,目前最新flink社区版提供了Kafka/Hive/File > Source,其他数据源的source是需要自己开发吗?社区有没有一个贡献source的地方? > 有没有介绍如何自定义基于新版source架构的source文章或博客呢?谢谢!
Re: 这里为什么会报null指针错误,和源表数据有关系吗?
Hi,看起来是需要一个脏数据处理的逻辑的,可以在jira上建一个issue,并给出更加详细的报错信息和可能的脏数据信息呀。建立issue后,可以在邮件里回复下。 Asahi Lee 于2022年9月14日周三 09:33写道: > 是的,运行一段时间后发生错误,null指针的代码哪里是不是要非空处理? > > > > > --原始邮件-- > 发件人: > "user-zh" > < > xyzhong...@163.com; > 发送时间:2022年9月9日(星期五) 晚上8:37 > 收件人:"user-zh" > 主题:Re:这里为什么会报null指针错误,和源表数据有关系吗? > > > > Hi,看上去是遇到了一条脏数据,问一下是在运行了一段时间之后突然报错的嘛? > > > > > > > > -- > > Best! > Xuyang > > > > > > At 2022-09-09 11:46:47, "Asahi Lee" 2022-09-09 11:36:42,866 INFOnbsp; > org.apache.flink.runtime.executiongraph.ExecutionGraphnbsp; nbsp; > nbsp; nbsp;[] - Source: > HiveSource-ods_jt_hrs.ods_hrmis_HR_EMPL_Education (1/1) > (2a68412dab3602a1eeda5a750b308e23) switched from RUNNING to FAILED on > container_1658144991761_106260_01_02 @ hhny-cdh05 (dataPort=45015). > java.lang.RuntimeException: null > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:105) > ~[flink-dist_2.11-1.14.3.jar:1.14.3] > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) > ~[flink-dist_2.11-1.14.3.jar:1.14.3] > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:43) > ~[flink-dist_2.11-1.14.3.jar:1.14.3] > at > org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:196) > ~[flink-dist_2.11-1.14.3.jar:1.14.3] > at > org.apache.flink.streaming.api.operators.source.NoOpTimestampsAndWatermarks$TimestampsOnlyOutput.collect(NoOpTimestampsAndWatermarks.java:97) > ~[flink-dist_2.11-1.14.3.jar:1.14.3] > at > org.apache.flink.streaming.api.operators.source.NoOpTimestampsAndWatermarks$TimestampsOnlyOutput.collect(NoOpTimestampsAndWatermarks.java:91) > ~[flink-dist_2.11-1.14.3.jar:1.14.3] > at > org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:45) > ~[flink-table_2.11-1.14.3.jar:1.14.3] > at > org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:35) > ~[flink-table_2.11-1.14.3.jar:1.14.3] > at > org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143) > ~[flink-table_2.11-1.14.3.jar:1.14.3] > at > org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:350) > ~[flink-dist_2.11-1.14.3.jar:1.14.3] > at > org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) > ~[flink-dist_2.11-1.14.3.jar:1.14.3] > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > ~[flink-dist_2.11-1.14.3.jar:1.14.3] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496) > ~[flink-dist_2.11-1.14.3.jar:1.14.3] > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) > ~[flink-dist_2.11-1.14.3.jar:1.14.3] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) > ~[flink-dist_2.11-1.14.3.jar:1.14.3] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) > ~[flink-dist_2.11-1.14.3.jar:1.14.3] > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) > ~[flink-dist_2.11-1.14.3.jar:1.14.3] > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) > ~[flink-dist_2.11-1.14.3.jar:1.14.3] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) > ~[flink-dist_2.11-1.14.3.jar:1.14.3] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) > ~[flink-dist_2.11-1.14.3.jar:1.14.3] > at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_181] > Caused by: java.lang.NullPointerException > at > org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:118) > ~[flink-table_2.11-1.14.3.jar:1.14.3] > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.toBinaryRow(RowDataSerializer.java:204) > ~[flink-table_2.11-1.14.3.jar:1.14.3] > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:103) > ~[flink-table_2.11-1.14.3.jar:1.14.3] > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:48) > ~[flink-table_2.11-1.14.3.jar:1.14.3] > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:168) > ~[flink-dist_2.11-1.14.3.jar:1.14.3] > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46) > ~[flink-dist_2.11-1.14.3.jar:1.14.3] > at >
Re: 关于flink table store的疑问
你好,从我个人的角度出发,我认为 flink-table-store 与 hudi, iceberg 的定位是不同的。 hudi 和 iceberg 更多的是一种 format 格式,通过这个格式来管理 schema 信息和解决行业痛点,其不与特定计算引擎绑定。其中, hudi 解决了超大数据量下的 upsert 问题, iceberg 解决了 oss 存储和上云的问题,但是他们本质上还是一种存储格式(format),这是其优势也是其劣势,优势在于不受引擎约束,专注于format层本身;缺点是无法参与主流引擎的未来规划,不易扩展,且发展受限,不能很快的参与到 olap等领域。 而 flink-table-store 更类似 spark delta, 是一种与 flink 深度绑定,并服务于 flink 存储的实时数仓方案。其能结合 flink 强大的流计算能力, 来更简单的满足实时数仓相关的需求,也能根据 flink 未来版本的规划,参与到社区中,更能够满足客户的需求。 综上,flink-table-store 的定位不只是数据湖存储,而是以数据湖存储为基础的实时数仓方案,其与 flink 深度绑定,形成互补。 Kyle Zhang 于2022年9月8日周四 08:37写道: > Hi all, > 看table > store的介绍也是关于数据湖存储以及用于实时流式读取的,那在定位上与iceberg、hudi等项目有什么不一样么,为什么要再开发一个项目? > > Best. >
Re: 触发savepoint后, source算子会从对应offset处停止消费吗?
hi, 在我的理解里,savePoint 的作用和 checkPoint 是类似的,只是在 flink 1.16 以前 savePoint 只支持全量的 savePoint,底层都是采用的 barrier 实现机制。但是在1.16的规划文档里( https://cwiki.apache.org/confluence/display/FLINK/FLIP-203%3A+Incremental+savepoints),savepoint 也将支持增量的模式。 当 savepoint 触发时, source 会去保存状态,是会停止消费的。 郑 致远 于2022年9月8日周四 19:39写道: > 各位大神好. > 请教 > savepoint 也是用 barrier机制实现的吗? > savepoint 触发的时候, source算子会停止从kafka消费吗? >
Re: hello flink
Hello yh z 于2022年9月2日周五 11:51写道: > hello flink >
s3p 如果在本地调试
各位好: 了解实际运行是要复制jar到plugin下,但是调试的话用该怎么初始化s3p这个文件系统了? flink版本 1.14,win10 项目通过flink-quick-start创建,在pom中添加了如下依赖 org.apache.flink flink-s3-fs-presto ${flink.version} 初始代码类似如下: Configuration fileSystemConf = new Configuration(); fileSystemConf.setBoolean("presto.s3.connection.ssl.enabled", false); fileSystemConf.setString("presto.s3.access-key", "minioadmin"); fileSystemConf.setString("presto.s3.secret-key", "minioadmin"); fileSystemConf.setString("presto.s3.endpoint", "http://127.0.0.1:9000;); FileSystem.initialize(fileSystemConf); Path path = new Path("s3p://test/"); System.out.println(path.getFileSystem().exists(path)); 但是会抛出如下异常: Exception in thread "main" org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3p'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/. at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:515) at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409) at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274) at org.example.StreamingJob.main(StreamingJob.java:58) 但是神奇的是,我可以用s3a 初始化配置如下: fileSystemConf.setBoolean("fs.s3a.connection.ssl.enabled", false); fileSystemConf.setString("fs.s3a.endpoint", "http://127.0.0.1:9000;); fileSystemConf.setString("fs.s3a.access.key", "minioadmin"); fileSystemConf.setString("fs.s3a.secret.key", "minioadmin"); fileSystemConf.setString("fs.s3a.path.style.access", "true"); fileSystemConf.setString("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"); 谢谢!
flink sql????????????
hi??kafkaflink sqlmysql??Aid??tsjoin?? ??011:55A??0:01A??join?? ??10+515??
动态加载table和udf的方法
你好,在开发中遇到了下面这个困难。 目前将定义的table和udf函数写在了 conf/sql-client-defaults.yaml 文件中。程序运行没有问题,但是带来一个问题:如果我需要添加或者修改table或者udf文件的时候,需要重启flink程序。有没有办法可以让我动态地添加或者更新我的table和udf(table新增可以通过sql-client来添加,但是更倾向于将table的定义记录在文件,udf存在修改和新增的情况)。这样依赖可以保证flink不重启。
??????flink 1.11 taskmanager????????????????????????
rocksdb?? ---- ??: "Z-Z"
flink 1.11 taskmanager????????????????????????
Hi ?? flink docker sessiontaskmanager taskmanager.memory.process.size: 5120m taskmanager.memory.jvm-metaspace.size: 1024m taskmanager7.5G??taskmanager INFO [] - Final TaskExecutor Memory configuration: INFO [] - Total Process Memory: 5.000gb (5368709120 bytes) INFO [] - Total Flink Memory: 3.500gb (3758096376 bytes) INFO [] - Total JVM Heap Memory: 1.625gb (1744830433 bytes) INFO [] -Framework: 128.000mb (134217728 bytes) INFO [] -Task: 1.500gb (1610612705 bytes) INFO [] - Total Off-heap Memory: 1.875gb (2013265943 bytes) INFO [] -Managed: 1.400gb (1503238572 bytes) INFO [] -Total JVM Direct Memory: 486.400mb (510027371 bytes) INFO [] - Framework: 128.000mb (134217728 bytes) INFO [] - Task: 0 bytes INFO [] - Network: 358.400mb (375809643 bytes) INFO [] - JVM Metaspace: 1024.000mb (1073741824 bytes) INFO [] - JVM Overhead: 512.000mb (536870920 bytes) jdk1.8??jmapdump??-F 1: Unable to open socket file: target process not responding or HotSpot VM not loaded ??
?????? Flink Cli ????????
cli ?? ??jobmanager bin/flink run -d -p 1 -s {savepointuri} /data/test.jar webui??http://jobmanager:8081; submit new job??jar??savepoint path ---- ??: "user-zh"
?????? Flink Cli ????????
taskmanagercliwebui?? 2020-07-20 03:29:25,959 WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'value.serializer' was supplied but isn't a known config. 2020-07-20 03:29:25,959 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.11.0.2 2020-07-20 03:29:25,959 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 73be1e1168f91ee2 2020-07-20 03:29:25,974 ERROR org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder - Caught unexpected exception. java.lang.ArrayIndexOutOfBoundsException: 0 at org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.hasMetaDataFollowsFlag(RocksSnapshotUtil.java:45) at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateData(RocksDBFullRestoreOperation.java:223) at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168) at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:151) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279) at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:748) 2020-07-20 03:29:25,974 WARN org.apache.flink.streaming.api.operators.BackendRestorerProcedure - Exception while restoring keyed state backend for StreamMap_caf773fe289bfdb867e0b4bd0c431c5f_(1/1) from alternative (1/1), will retry while more alternatives are available. org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception. at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336) at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at
?????? Flink Cli ????????
Flink 1.10.0 ,taskmanager?? 2020-07-17 15:06:43,913 ERROR org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder - Caught unexpected exception. java.io.EOFException at java.io.DataInputStream.readFully(DataInputStream.java:197) at java.io.DataInputStream.readFully(DataInputStream.java:169) at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:85) at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateData(RocksDBFullRestoreOperation.java:221) at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168) at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:151) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279) at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:748) 2020-07-17 15:06:43,914 WARN org.apache.flink.streaming.api.operators.BackendRestorerProcedure - Exception while restoring keyed state backend for KeyedCoProcessOperator_00360b8021b192d84949201d4fea80f2_(1/1) from alternative (1/1), will retry while more alternatives are available. org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception. at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336) at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at
Flink Cli ????????
restAPI??savepoint(??/jobs/overview --- /jobs/{jobid}/savepoints --- /jobs/{jobid}/savepoints/{triggerid})??flinksavepointwebuijar??savepoint?? 2020-07-17 09:51:48,925 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Request slot with profile ResourceProfile{UNKNOWN} for job 7639673873b707aa86c4387aa7b4aac3 with allocation id e8865cdbfe4c3c33099c7112bc2e3231. 2020-07-17 09:51:48,952 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source - Filter (1/1) (1177659bff014e8dbc3f0508055d4307) switched from SCHEDULED to DEPLOYING. 2020-07-17 09:51:48,952 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Source: Custom Source - Filter (1/1) (attempt #0) to e63d829deafc144cd82efd73979dd056 @ 083f69d029de (dataPort=35758) 2020-07-17 09:51:48,953 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (1/1) (141f0dc22b624b39e21127f637ba63c2) switched from SCHEDULED to DEPLOYING. 2020-07-17 09:51:48,953 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Source: Custom Source (1/1) (attempt #0) to e63d829deafc144cd82efd73979dd056 @ 083f69d029de (dataPort=35758) 2020-07-17 09:51:48,954 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (1/1) (274b3df03e1fab627059c1a78e4a26da) switched from SCHEDULED to DEPLOYING. 2020-07-17 09:51:48,954 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Source: Custom Source (1/1) (attempt #0) to e63d829deafc144cd82efd73979dd056 @ 083f69d029de (dataPort=35758) 2020-07-17 09:51:48,954 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Co-Process (1/1) (d0309f26a545e74643382ed3f758269b) switched from SCHEDULED to DEPLOYING. 2020-07-17 09:51:48,954 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Co-Process (1/1) (attempt #0) to e63d829deafc144cd82efd73979dd056 @ 083f69d029de (dataPort=35758) 2020-07-17 09:51:48,955 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Co-Process - (Sink: Unnamed, Sink: Unnamed) (1/1) (618b75fcf5ea05fb5c6487bec6426e31) switched from SCHEDULED to DEPLOYING. 2020-07-17 09:51:48,955 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Co-Process - (Sink: Unnamed, Sink: Unnamed) (1/1) (attempt #0) to e63d829deafc144cd82efd73979dd056 @ 083f69d029de (dataPort=35758) 2020-07-17 09:51:49,346 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Co-Process - (Sink: Unnamed, Sink: Unnamed) (1/1) (618b75fcf5ea05fb5c6487bec6426e31) switched from DEPLOYING to RUNNING. 2020-07-17 09:51:49,370 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (1/1) (274b3df03e1fab627059c1a78e4a26da) switched from DEPLOYING to RUNNING. 2020-07-17 09:51:49,370 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (1/1) (141f0dc22b624b39e21127f637ba63c2) switched from DEPLOYING to RUNNING. 2020-07-17 09:51:49,377 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Co-Process (1/1) (d0309f26a545e74643382ed3f758269b) switched from DEPLOYING to RUNNING. 2020-07-17 09:51:49,377 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source - Filter (1/1) (1177659bff014e8dbc3f0508055d4307) switched from DEPLOYING to RUNNING. 2020-07-17 09:51:49,493 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Co-Process (1/1) (d0309f26a545e74643382ed3f758269b) switched from RUNNING to FAILED. java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for
????????Flink Hadoop????????
Flink 1.11.0docker-compose??docker-compose?? version: "2.1" services: jobmanager: image: flink:1.11.0-scala_2.12 expose: - "6123" ports: - "8081:8081" command: jobmanager environment: - JOB_MANAGER_RPC_ADDRESS=jobmanager - HADOOP_CLASSPATH=/data/hadoop-2.9.2/etc/hadoop:/data/hadoop-2.9.2/share/hadoop/common/lib/*:/data/hadoop-2.9.2/share/hadoop/common/*:/data/hadoop-2.9.2/share/hadoop/hdfs:/data/hadoop-2.9.2/share/hadoop/hdfs/lib/*:/data/hadoop-2.9.2/share/hadoop/hdfs/*:/data/hadoop-2.9.2/share/hadoop/yarn:/data/hadoop-2.9.2/share/hadoop/yarn/lib/*:/data/hadoop-2.9.2/share/hadoop/yarn/*:/data/hadoop-2.9.2/share/hadoop/mapreduce/lib/*:/data/hadoop-2.9.2/share/hadoop/mapreduce/*:/contrib/capacity-scheduler/*.jar volumes: - ./jobmanager/conf:/opt/flink/conf - ./data:/data taskmanager: image: flink:1.11.0-scala_2.12 expose: - "6121" - "6122" depends_on: - jobmanager command: taskmanager links: - "jobmanager:jobmanager" environment: - JOB_MANAGER_RPC_ADDRESS=jobmanager volumes: - ./taskmanager/conf:/opt/flink/conf networks: default: external: name: flink-network hadoop-2.9.2datajobmanager??taskmanager??HADOOP_CLASSPATHcli??webui??jobmanager??Could not find a file system implementation for scheme 'hdfs'
Flink Hadoop????
Hi?? ??Flink 1.10.0??jobmanager??libflink-shaded-hadoop-2-uber-2.7.5-10.0.jar??webuicli??Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.??
Flink????????
Hi?? ?? Flink??checkpointcheckpoint??
keyed state????????????????
??keybyStateDescriptorkeyed state??
???? StreamingFileSink????????????????HA??Hadoop????,??????yarn job
StreamingFileSinkHA??Hadoop,??yarn job ?? ?? ?? = Mobile??18611696624 QQ:79434564
Flink??????????????????
Hi?? ?? ??Flink??(NullPointer??)checkpoint??savepoint?? 1: Flink?? 2??checkpoint??savepointsavepoint?? 3??