Re: Flink connector 是否支持忽略delete message

2023-07-10 文章 yh z
Hi,  shi franke. 你可以尝试自己实现一个 DynamicTableSink,在里面添加参数 “sink.ignore-delete”。
你可以参考 github 上的一些实现,例如 clickhouse:
https://github.com/liekkassmile/flink-connector-clickhouse-1.13

shi franke  于2023年7月7日周五 19:24写道:

>
> 感谢您的回复,这样自定义是可以实现的,我们目前使用的是1.15的flink版本。想看一下社区是不是有在框架层面实现这个配置的支持,理解这应该也是一个相对common的配置
> junjie.m...@goupwith.com  于2023年7月7日周五 17:57写道:
>
> > 可以自己用DataStream API通过RowKind进行过滤。
> > 如下示例代码:import org.apache.flink.api.common.functions.RichFlatMapFunction;
> > import org.apache.flink.types.Row;
> > import org.apache.flink.types.RowKind;
> > import org.apache.flink.util.Collector;
> >
> > /**
> >  * 增量数据过滤函数
> >  */
> > public class AppendOnlyFilterFunction extends RichFlatMapFunction > Row> {
> >
> > private boolean includedUpdateAfter = false;
> >
> > public AppendOnlyFilterFunction() {
> > }
> >
> > public AppendOnlyFilterFunction(boolean includedUpdateAfter) {
> > this.includedUpdateAfter = includedUpdateAfter;
> > }
> >
> > @Override
> > public void flatMap(Row row, Collector collector) throws
> > Exception {
> > if (RowKind.INSERT == row.getKind()) {
> > collector.collect(row);
> > } else if (includedUpdateAfter && RowKind.UPDATE_AFTER ==
> > row.getKind()) {
> > row.setKind(RowKind.INSERT);
> > collector.collect(row);
> > }
> > }
> >
> > }
> >
> > 发件人: shi franke
> > 发送时间: 2023-07-07 17:33
> > 收件人: user-zh
> > 主题: Flink connector 是否支持忽略delete message
> > 咨询下各位大佬,请问下connector现在有支持忽略delete消息的选项配置
> >
> > 吗?比如上游的数据是一个upsert/retract流,在connector这里是否有选项去忽略delete
> > message,当作append流只去戳里insert消息。我看现在代码没有类似的功能,不确定是否有相关的jira或者实现
> >
>


Re: flink1.17.1使用kafka source异常

2023-07-05 文章 yh z
Hi, aiden. 看起来是类冲突,按照官方的文档,使用 kafka 时,你应该是不需要引入 flink-core 和
flink-connector-base 的(
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/kafka/)。如果是因为其他原因需要使用这两个
jar, 你可以使用 mvn dependency::tree 查看一下
"org/apache/kafka/clients/consumer/ConsumerRecord" 是在哪里被重复加载进来,可以exclude 掉非
flink-connector-kafka 的这个类。

aiden <18765295...@163.com> 于2023年7月4日周二 14:23写道:

> hi
>
> 在使用1.17.1版本kafka source时遇到如下异常:
> Caused by: java.lang.LinkageError: loader constraint violation: loader
> (instance of org/apache/flink/util/ChildFirstClassLoader) previously
> initiated loading for a different type with name
> "org/apache/kafka/clients/consumer/ConsumerRecord"
> at java.lang.ClassLoader.defineClass1(Native Method)
> at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
> at
> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
> at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
> at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
> at
> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:71)
> at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.getDeclaredMethods0(Native Method)
> at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
> at java.lang.Class.getDeclaredMethod(Class.java:2128)
> at
> java.io.ObjectStreamClass.getPrivateMethod(ObjectStreamClass.java:1629)
> at java.io.ObjectStreamClass.access$1700(ObjectStreamClass.java:79)
> at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:520)
> at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:494)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.io.ObjectStreamClass.(ObjectStreamClass.java:494)
> at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391)
> at
> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:681)
> at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
> at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:534)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:522)
> at
> org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:67)
> at
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:471)
> at
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286)
> at
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223)
> ... 20 more
> 以下是我的部分POM   
> org.apache.flink
> flink-core
> 1.17.1
> 
> 
> org.apache.flink
> flink-connector-kafka
> 1.17.1
> 
> 
> org.apache.flink
> flink-connector-base
> 1.17.1
> 
>
>
> 看起来像是类加载器异常,需要我修改哪些地方吗
>


Re: Flink 1.16 流表 join 的 FilterPushDown 及并行

2023-07-05 文章 yh z
Hi, Chai Kelun, 你的 filter condition 里面包含了你自定义的 UDF,是不满足 filter push down
的条件的,因为对于优化器来说 UDF 是不确定的,优化器不能从里面提取到可以下推的条件, 如果你想实现下推,可以尝试抽取下确定性的
condition,如 product.id > 10 etc.。另外,Flink 是支持 broadcast hash join
的,如果你想控制某两个表的 join type,你可以通过 join hint 来指定 join 类型为 broadcast。()

Chai Kelun  于2023年7月3日周一 17:58写道:

> 有一张 kafka 流表 logClient(id int, name string, price double),一张实现了
> SupportsFilterPushDown 的 customConnector 维表 product(id int, name string,
> value double),实现了自定义函数 MyUDF(double d1, double d2) 用于自定义逻辑计算并支持该算子的下推。
> 在 Stream-Table Join 的场景下,下列 SQL 并没有将算子进行下推,而是通过 TableScan 将所有算子提到 Join
> 节点进行计算,请问是否有什么选项可以开启下推?(类似与 nestedloop-join,计算推到 product 表数据源进行)
> SELECT A.id, A.name, B.name FROM logClient AS A, product AS B WHERE
> MyUDF(B.value, A.price) < xxx;
> 另外,Kafka 和 customConnector 均支持并行,在 Join 计算时期望使用 BROADCAST 模式,将 product 表在
> logClient 流表的每个 partition 上进行计算,但似乎目前 Flink 流-表 Join 的 distribution 模式仅支持
> SINGLETON 和 HASH[KEY](StreamExecExchange.java Line106 的 switch
> case),后续社区是否会考虑支持更多的 distributionType?
>
> 非常感谢!


Re: flink cdc能否同步DDL语句?

2022-10-10 文章 yh z
目前,社区的 Flink CDC,是可以读取到 binlog 中的 DDL 语句的,但是不会传递给下游,即下游无法感知 schema 的变化。

Xuyang  于2022年10月10日周一 16:46写道:

> Hi, 目前应该是不行的
> 在 2022-09-26 23:27:05,"casel.chen"  写道:
> >flink cdc能否同步DDL语句以实现schema同步? 例如create table , create index, truncate
> table等
>


Re: flink 的数据传输,是上游算子推给下游, 还是下游算子拉取上游, 设计的考虑是啥?

2022-09-20 文章 yh z
你好。 Flink 采用的是 pull 模型。pull 模型的优点在于:1.
其具有更好的扩展性(下游的消费者可以根据需求增加,只需要获取到上游的消费位点); 2. 下游的消费者可以根据需求来调整消费速率;
3.网络传输,flink 以前也尝试使用过push模型,且为了节约开销,进程间是复用 TCP连接,一个 task 线程的性能瓶颈将导致整条链路的所有
task 线程不能接收数据,影响整体的数据消费速率。 push模型的优点:消耗较小,不需要设计机制来一直轮训观察上游节点的数据情况。

Xuyang  于2022年9月9日周五 20:35写道:

> Hi,主要是pull模型:下游主动拉取上游的数据。可以在下游的消费能力达到极限时,通过反压机制,让上游减少生产的数据。
>
>
>
>
>
>
>
> --
>
> Best!
> Xuyang
>
>
>
>
>
> 在 2022-09-09 19:04:27,"郑 致远"  写道:
> >各位大佬好
> >请教下,
> >flink 的数据传输,是上游算子推给下游, 还是下游算子拉取上游,  这么设计的考虑是啥呢?
>


Re: flink hybrid source问题

2022-09-20 文章 yh z
你好,hybrid source 现在需要基于 FLIP-27 source 来实现(如:FileSource, KafkaSource),对于非
FLIP-27 source 需要做一些修改后才可以使用。如果想参与 hybird source 的扩展,可以在 slack
中加入flink社群,并发起讨论。 关于 source 相关的文档,可以查看官网和 flip 设计和讨论页面(
https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source
)(
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
)。希望能帮到你。

casel.chen  于2022年9月19日周一 19:42写道:

> 我有一个flink实时计算场景是需要先从MaxCompute读取一张表的存量数据,再从相应的kafka topic读取增量数据,一并进行计算处理。
> 看了一下需要用到hybrid source,目前最新flink社区版提供了Kafka/Hive/File
> Source,其他数据源的source是需要自己开发吗?社区有没有一个贡献source的地方?
> 有没有介绍如何自定义基于新版source架构的source文章或博客呢?谢谢!


Re: 这里为什么会报null指针错误,和源表数据有关系吗?

2022-09-18 文章 yh z
Hi,看起来是需要一个脏数据处理的逻辑的,可以在jira上建一个issue,并给出更加详细的报错信息和可能的脏数据信息呀。建立issue后,可以在邮件里回复下。

Asahi Lee  于2022年9月14日周三 09:33写道:

> 是的,运行一段时间后发生错误,null指针的代码哪里是不是要非空处理?
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> xyzhong...@163.com;
> 发送时间:2022年9月9日(星期五) 晚上8:37
> 收件人:"user-zh"
> 主题:Re:这里为什么会报null指针错误,和源表数据有关系吗?
>
>
>
> Hi,看上去是遇到了一条脏数据,问一下是在运行了一段时间之后突然报错的嘛?
>
>
>
>
>
>
>
> --
>
>  Best!
>  Xuyang
>
>
>
>
>
> At 2022-09-09 11:46:47, "Asahi Lee"  2022-09-09 11:36:42,866 INFOnbsp;
> org.apache.flink.runtime.executiongraph.ExecutionGraphnbsp; nbsp;
> nbsp; nbsp;[] - Source:
> HiveSource-ods_jt_hrs.ods_hrmis_HR_EMPL_Education (1/1)
> (2a68412dab3602a1eeda5a750b308e23) switched from RUNNING to FAILED on
> container_1658144991761_106260_01_02 @ hhny-cdh05 (dataPort=45015).
> java.lang.RuntimeException: null
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:105)
> ~[flink-dist_2.11-1.14.3.jar:1.14.3]
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> ~[flink-dist_2.11-1.14.3.jar:1.14.3]
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:43)
> ~[flink-dist_2.11-1.14.3.jar:1.14.3]
> at
> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:196)
> ~[flink-dist_2.11-1.14.3.jar:1.14.3]
> at
> org.apache.flink.streaming.api.operators.source.NoOpTimestampsAndWatermarks$TimestampsOnlyOutput.collect(NoOpTimestampsAndWatermarks.java:97)
> ~[flink-dist_2.11-1.14.3.jar:1.14.3]
> at
> org.apache.flink.streaming.api.operators.source.NoOpTimestampsAndWatermarks$TimestampsOnlyOutput.collect(NoOpTimestampsAndWatermarks.java:91)
> ~[flink-dist_2.11-1.14.3.jar:1.14.3]
> at
> org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:45)
> ~[flink-table_2.11-1.14.3.jar:1.14.3]
> at
> org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:35)
> ~[flink-table_2.11-1.14.3.jar:1.14.3]
> at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)
> ~[flink-table_2.11-1.14.3.jar:1.14.3]
> at
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:350)
> ~[flink-dist_2.11-1.14.3.jar:1.14.3]
> at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
> ~[flink-dist_2.11-1.14.3.jar:1.14.3]
> at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> ~[flink-dist_2.11-1.14.3.jar:1.14.3]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
> ~[flink-dist_2.11-1.14.3.jar:1.14.3]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
> ~[flink-dist_2.11-1.14.3.jar:1.14.3]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
> ~[flink-dist_2.11-1.14.3.jar:1.14.3]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
> ~[flink-dist_2.11-1.14.3.jar:1.14.3]
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
> ~[flink-dist_2.11-1.14.3.jar:1.14.3]
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
> ~[flink-dist_2.11-1.14.3.jar:1.14.3]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
> ~[flink-dist_2.11-1.14.3.jar:1.14.3]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> ~[flink-dist_2.11-1.14.3.jar:1.14.3]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_181]
> Caused by: java.lang.NullPointerException
> at
> org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:118)
> ~[flink-table_2.11-1.14.3.jar:1.14.3]
> at
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.toBinaryRow(RowDataSerializer.java:204)
> ~[flink-table_2.11-1.14.3.jar:1.14.3]
> at
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:103)
> ~[flink-table_2.11-1.14.3.jar:1.14.3]
> at
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:48)
> ~[flink-table_2.11-1.14.3.jar:1.14.3]
> at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:168)
> ~[flink-dist_2.11-1.14.3.jar:1.14.3]
> at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46)
> ~[flink-dist_2.11-1.14.3.jar:1.14.3]
> at
> 

Re: 关于flink table store的疑问

2022-09-13 文章 yh z
你好,从我个人的角度出发,我认为 flink-table-store 与 hudi, iceberg 的定位是不同的。 hudi 和 iceberg
更多的是一种 format 格式,通过这个格式来管理 schema 信息和解决行业痛点,其不与特定计算引擎绑定。其中, hudi 解决了超大数据量下的
upsert 问题, iceberg 解决了 oss
存储和上云的问题,但是他们本质上还是一种存储格式(format),这是其优势也是其劣势,优势在于不受引擎约束,专注于format层本身;缺点是无法参与主流引擎的未来规划,不易扩展,且发展受限,不能很快的参与到
olap等领域。 而 flink-table-store 更类似 spark delta, 是一种与 flink 深度绑定,并服务于 flink
存储的实时数仓方案。其能结合 flink 强大的流计算能力, 来更简单的满足实时数仓相关的需求,也能根据 flink
未来版本的规划,参与到社区中,更能够满足客户的需求。
   综上,flink-table-store 的定位不只是数据湖存储,而是以数据湖存储为基础的实时数仓方案,其与 flink
深度绑定,形成互补。
Kyle Zhang  于2022年9月8日周四 08:37写道:

> Hi all,
>   看table
> store的介绍也是关于数据湖存储以及用于实时流式读取的,那在定位上与iceberg、hudi等项目有什么不一样么,为什么要再开发一个项目?
>
> Best.
>


Re: 触发savepoint后, source算子会从对应offset处停止消费吗?

2022-09-08 文章 yh z
hi, 在我的理解里,savePoint 的作用和 checkPoint 是类似的,只是在 flink 1.16 以前 savePoint
只支持全量的 savePoint,底层都是采用的 barrier 实现机制。但是在1.16的规划文档里(
https://cwiki.apache.org/confluence/display/FLINK/FLIP-203%3A+Incremental+savepoints),savepoint
也将支持增量的模式。
当 savepoint 触发时, source 会去保存状态,是会停止消费的。

郑 致远  于2022年9月8日周四 19:39写道:

> 各位大神好.
> 请教
> savepoint 也是用 barrier机制实现的吗?
> savepoint 触发的时候,  source算子会停止从kafka消费吗?
>


Re: hello flink

2022-09-02 文章 yh z
Hello

yh z  于2022年9月2日周五 11:51写道:

> hello flink
>


s3p 如果在本地调试

2022-05-19 文章 z y xing
各位好:
了解实际运行是要复制jar到plugin下,但是调试的话用该怎么初始化s3p这个文件系统了?

flink版本 1.14,win10
项目通过flink-quick-start创建,在pom中添加了如下依赖


   org.apache.flink
   flink-s3-fs-presto
   ${flink.version}


初始代码类似如下:

Configuration fileSystemConf = new Configuration();

fileSystemConf.setBoolean("presto.s3.connection.ssl.enabled", false);
fileSystemConf.setString("presto.s3.access-key", "minioadmin");
fileSystemConf.setString("presto.s3.secret-key", "minioadmin");
fileSystemConf.setString("presto.s3.endpoint", "http://127.0.0.1:9000;);

FileSystem.initialize(fileSystemConf);

Path path = new Path("s3p://test/");
System.out.println(path.getFileSystem().exists(path));

但是会抛出如下异常:
Exception in thread "main"
org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
find a file system implementation for scheme 's3p'. The scheme is directly
supported by Flink through the following plugin: flink-s3-fs-presto. Please
ensure that each plugin resides within its own subfolder within the plugins
directory. See
https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for
more information. If you want to use a Hadoop file system for that scheme,
please add the scheme to the configuration fs.allowed-fallback-filesystems.
For a full list of supported file systems, please see
https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.
at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:515)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274)
at org.example.StreamingJob.main(StreamingJob.java:58)

但是神奇的是,我可以用s3a
初始化配置如下:

fileSystemConf.setBoolean("fs.s3a.connection.ssl.enabled", false);
fileSystemConf.setString("fs.s3a.endpoint", "http://127.0.0.1:9000;);
fileSystemConf.setString("fs.s3a.access.key", "minioadmin");
fileSystemConf.setString("fs.s3a.secret.key", "minioadmin");
fileSystemConf.setString("fs.s3a.path.style.access", "true");
fileSystemConf.setString("fs.s3a.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem");


谢谢!


flink sql????????????

2021-09-28 文章 z
hi??kafkaflink
 
sqlmysql??Aid??tsjoin??
??011:55A??0:01A??join??
??10+515??

动态加载table和udf的方法

2020-10-09 文章 Zeahoo Z
你好,在开发中遇到了下面这个困难。

目前将定义的table和udf函数写在了 conf/sql-client-defaults.yaml
文件中。程序运行没有问题,但是带来一个问题:如果我需要添加或者修改table或者udf文件的时候,需要重启flink程序。有没有办法可以让我动态地添加或者更新我的table和udf(table新增可以通过sql-client来添加,但是更倾向于将table的定义记录在文件,udf存在修改和新增的情况)。这样依赖可以保证flink不重启。


??????flink 1.11 taskmanager????????????????????????

2020-09-09 文章 Z-Z
rocksdb??




----
??: 
   "Z-Z"



flink 1.11 taskmanager????????????????????????

2020-09-09 文章 Z-Z
Hi ??
flink docker sessiontaskmanager
taskmanager.memory.process.size: 5120m
taskmanager.memory.jvm-metaspace.size: 1024m

taskmanager7.5G??taskmanager
INFO [] - Final TaskExecutor Memory configuration:
INFO [] - Total Process Memory:
 5.000gb (5368709120 bytes)
INFO [] -  Total Flink Memory:   
  3.500gb (3758096376 bytes)
INFO [] -   Total JVM Heap Memory:  
1.625gb (1744830433 bytes)
INFO [] -Framework:   
128.000mb (134217728 bytes)
INFO [] -Task:   
   1.500gb (1610612705 bytes)
INFO [] -   Total Off-heap Memory:  
1.875gb (2013265943 bytes)
INFO [] -Managed:   
 1.400gb (1503238572 bytes)
INFO [] -Total JVM Direct Memory: 
486.400mb (510027371 bytes)
INFO [] - Framework:  
128.000mb (134217728 bytes)
INFO [] - Task:  
   0 bytes
INFO [] - Network:  
 358.400mb (375809643 bytes)
INFO [] -  JVM Metaspace:
   1024.000mb (1073741824 bytes)
INFO [] -  JVM Overhead:
512.000mb (536870920 bytes)

jdk1.8??jmapdump??-F
1: Unable to open socket file: target process not responding or HotSpot VM not 
loaded


??

?????? Flink Cli ????????

2020-07-20 文章 Z-Z
 cli ?? ??jobmanager  bin/flink run -d -p 1 -s {savepointuri} 
/data/test.jar 
webui??http://jobmanager:8081; submit new 
job??jar??savepoint path  





----
??: 
   "user-zh"



?????? Flink Cli ????????

2020-07-19 文章 Z-Z
taskmanagercliwebui??
2020-07-20 03:29:25,959 WARN 
org.apache.kafka.clients.consumer.ConsumerConfig
   - The configuration 'value.serializer' was supplied but 
isn't a known config.
2020-07-20 03:29:25,959 INFO 
org.apache.kafka.common.utils.AppInfoParser 
- Kafka version : 0.11.0.2
2020-07-20 03:29:25,959 INFO 
org.apache.kafka.common.utils.AppInfoParser 
- Kafka commitId : 73be1e1168f91ee2
2020-07-20 03:29:25,974 ERROR 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder 
- Caught unexpected exception.
java.lang.ArrayIndexOutOfBoundsException: 0
at 
org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.hasMetaDataFollowsFlag(RocksSnapshotUtil.java:45)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateData(RocksDBFullRestoreOperation.java:223)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:151)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
2020-07-20 03:29:25,974 WARN 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure - 
Exception while restoring keyed state backend for 
StreamMap_caf773fe289bfdb867e0b4bd0c431c5f_(1/1) from alternative (1/1), will 
retry while more alternatives are available.
org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected 
exception.
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at 

?????? Flink Cli ????????

2020-07-17 文章 Z-Z
Flink 1.10.0 ,taskmanager??


2020-07-17 15:06:43,913 ERROR 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder 
- Caught unexpected exception.
java.io.EOFException
at java.io.DataInputStream.readFully(DataInputStream.java:197)
at java.io.DataInputStream.readFully(DataInputStream.java:169)
at 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:85)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateData(RocksDBFullRestoreOperation.java:221)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:151)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
2020-07-17 15:06:43,914 WARN 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure - 
Exception while restoring keyed state backend for 
KeyedCoProcessOperator_00360b8021b192d84949201d4fea80f2_(1/1) from alternative 
(1/1), will retry while more alternatives are available.
org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected 
exception.
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at 

Flink Cli ????????

2020-07-17 文章 Z-Z
restAPI??savepoint(??/jobs/overview
 --- /jobs/{jobid}/savepoints --- 
/jobs/{jobid}/savepoints/{triggerid})??flinksavepointwebuijar??savepoint??
2020-07-17 09:51:48,925 INFO 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - 
Request slot with profile ResourceProfile{UNKNOWN} for job 
7639673873b707aa86c4387aa7b4aac3 with allocation id 
e8865cdbfe4c3c33099c7112bc2e3231.
2020-07-17 09:51:48,952 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   
 - Source: Custom Source - Filter (1/1) 
(1177659bff014e8dbc3f0508055d4307) switched from SCHEDULED to DEPLOYING.
2020-07-17 09:51:48,952 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   
 - Deploying Source: Custom Source - Filter (1/1) (attempt #0) to 
e63d829deafc144cd82efd73979dd056 @ 083f69d029de (dataPort=35758)
2020-07-17 09:51:48,953 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   
 - Source: Custom Source (1/1) (141f0dc22b624b39e21127f637ba63c2) 
switched from SCHEDULED to DEPLOYING.
2020-07-17 09:51:48,953 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   
 - Deploying Source: Custom Source (1/1) (attempt #0) to 
e63d829deafc144cd82efd73979dd056 @ 083f69d029de (dataPort=35758)
2020-07-17 09:51:48,954 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   
 - Source: Custom Source (1/1) (274b3df03e1fab627059c1a78e4a26da) 
switched from SCHEDULED to DEPLOYING.
2020-07-17 09:51:48,954 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   
 - Deploying Source: Custom Source (1/1) (attempt #0) to 
e63d829deafc144cd82efd73979dd056 @ 083f69d029de (dataPort=35758)
2020-07-17 09:51:48,954 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   
 - Co-Process (1/1) (d0309f26a545e74643382ed3f758269b) switched from 
SCHEDULED to DEPLOYING.
2020-07-17 09:51:48,954 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   
 - Deploying Co-Process (1/1) (attempt #0) to 
e63d829deafc144cd82efd73979dd056 @ 083f69d029de (dataPort=35758)
2020-07-17 09:51:48,955 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   
 - Co-Process - (Sink: Unnamed, Sink: Unnamed) (1/1) 
(618b75fcf5ea05fb5c6487bec6426e31) switched from SCHEDULED to DEPLOYING.
2020-07-17 09:51:48,955 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   
 - Deploying Co-Process - (Sink: Unnamed, Sink: Unnamed) (1/1) 
(attempt #0) to e63d829deafc144cd82efd73979dd056 @ 083f69d029de (dataPort=35758)
2020-07-17 09:51:49,346 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   
 - Co-Process - (Sink: Unnamed, Sink: Unnamed) (1/1) 
(618b75fcf5ea05fb5c6487bec6426e31) switched from DEPLOYING to RUNNING.
2020-07-17 09:51:49,370 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   
 - Source: Custom Source (1/1) (274b3df03e1fab627059c1a78e4a26da) 
switched from DEPLOYING to RUNNING.
2020-07-17 09:51:49,370 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   
 - Source: Custom Source (1/1) (141f0dc22b624b39e21127f637ba63c2) 
switched from DEPLOYING to RUNNING.
2020-07-17 09:51:49,377 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   
 - Co-Process (1/1) (d0309f26a545e74643382ed3f758269b) switched from 
DEPLOYING to RUNNING.
2020-07-17 09:51:49,377 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   
 - Source: Custom Source - Filter (1/1) 
(1177659bff014e8dbc3f0508055d4307) switched from DEPLOYING to RUNNING.
2020-07-17 09:51:49,493 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   
 - Co-Process (1/1) (d0309f26a545e74643382ed3f758269b) switched from 
RUNNING to FAILED.
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for 

????????Flink Hadoop????????

2020-07-14 文章 Z-Z
Flink 1.11.0docker-compose??docker-compose??
version: "2.1"
services:
 jobmanager:
  image: flink:1.11.0-scala_2.12
  expose:
   - "6123"
  ports:
   - "8081:8081"
  command: jobmanager
  environment:
   - JOB_MANAGER_RPC_ADDRESS=jobmanager
   - 
HADOOP_CLASSPATH=/data/hadoop-2.9.2/etc/hadoop:/data/hadoop-2.9.2/share/hadoop/common/lib/*:/data/hadoop-2.9.2/share/hadoop/common/*:/data/hadoop-2.9.2/share/hadoop/hdfs:/data/hadoop-2.9.2/share/hadoop/hdfs/lib/*:/data/hadoop-2.9.2/share/hadoop/hdfs/*:/data/hadoop-2.9.2/share/hadoop/yarn:/data/hadoop-2.9.2/share/hadoop/yarn/lib/*:/data/hadoop-2.9.2/share/hadoop/yarn/*:/data/hadoop-2.9.2/share/hadoop/mapreduce/lib/*:/data/hadoop-2.9.2/share/hadoop/mapreduce/*:/contrib/capacity-scheduler/*.jar
  volumes:
   - ./jobmanager/conf:/opt/flink/conf
   - ./data:/data


 taskmanager:
  image: flink:1.11.0-scala_2.12
  expose:
   - "6121"
   - "6122"
  depends_on:
   - jobmanager
  command: taskmanager
  links:
   - "jobmanager:jobmanager"
  environment:
   - JOB_MANAGER_RPC_ADDRESS=jobmanager
  volumes:
   - ./taskmanager/conf:/opt/flink/conf
networks:
 default:
  external:
   name: flink-network



hadoop-2.9.2datajobmanager??taskmanager??HADOOP_CLASSPATHcli??webui??jobmanager??Could
 not find a file system implementation for scheme 'hdfs'

Flink Hadoop????

2020-07-07 文章 Z-Z
Hi?? ??Flink 
1.10.0??jobmanager??libflink-shaded-hadoop-2-uber-2.7.5-10.0.jar??webuicli??Could
 not find a file system implementation for scheme 'hdfs'. The scheme is not 
directly supported by Flink and no Hadoop file system to support this scheme 
could be loaded.??

Flink????????

2020-07-06 文章 Z-Z
Hi?? ?? 
Flink??checkpointcheckpoint??

keyed state????????????????

2020-06-10 文章 Z-Z
??keybyStateDescriptorkeyed
 state??

???? StreamingFileSink????????????????HA??Hadoop????,??????yarn job

2020-06-09 文章 ???Z?w???w

 StreamingFileSinkHA??Hadoop,??yarn job


??

?? ??
=
Mobile??18611696624
QQ:79434564




Flink??????????????????

2020-06-08 文章 Z-Z
Hi?? ??
  
??Flink??(NullPointer??)checkpoint??savepoint??
  1: Flink??
  
2??checkpoint??savepointsavepoint??
  
3??