Re: 退订

2022-12-27 文章 Lijie Wang
Hi,退订请发送任意内容至邮箱user-zh-unsubscr...@flink.apache.org

Best,
Lijie

DannyLau  于2022年12月27日周二 09:54写道:

> 退订
> | |
> 刘朝兵
> |


Re: flink on k8s 提交作业,使用 oss 作为 checkpoint 地址,但找不到 oss

2022-11-07 文章 Lijie Wang
 flink-oss-fs-hadoop-1.13.6.jar 这个 jar 需要放到  flink 的 lib 目录下

Best,
Lijie

highfei2011  于2022年11月1日周二 16:23写道:

> 包冲突了。
>
>
> 在 2022年11月1日 15:39,highfei2011 写道:
>
>
> flink 版本:apache flink 1.13.6 flink operator 版本: 1.2.0
> 提交命令:kubernetes-jobmanager.sh kubernetes-application 异常: Caused by:
> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
> find a file system implementation for scheme 'oss'. The scheme is directly
> supported by Flink through t he following plugin: flink-oss-fs-hadoop.
> Please ensure that each plugin resides within its own subfolder within the
> plugins directory. See https://ci.apache.org/projects/flink/flink-docs
> -stable/ops/plugins.html for more information. If you want to use a Hadoop
> file system for that scheme, please add the scheme to the configuration
> fs.allowed-fallback-filesystems. For a f ull list of supported file
> systems, please see
> https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.
> 我排查了 /opt/flink/opt/目录,下面是有 flink-oss-fs-hadoop-1.13.6.jar 注:本地测试正常,仅使用
> flink operator 提交时,发生如上异常。


Re: PartitionNotFoundException

2022-09-28 文章 Lijie Wang
Hi,

可以尝试增大一下 taskmanager.network.request-backoff.max 的值。默认值是 1,也就是 10 s。
上下游可能是并发部署的,所以是有可能下游请求 partition 时,上游还没部署完成,增大
taskmanager.network.request-backoff.max 可以增加下游的等待时间和重试次数,减小出现
PartitionNotFoundException 的概率。

Best,
Lijie

yidan zhao  于2022年9月28日周三 17:35写道:

> 按照flink的设计,存在上游还没部署成功,下游就开始请求 partition 的情况吗? 此外,上游没有部署成功一般会有相关日志不?
>
> 我目前重启了集群后OK了,在等段时间,看看还会不会出现。
>
> Shammon FY  于2022年9月28日周三 15:45写道:
> >
> > Hi
> >
> > 计算任务输出PartitionNotFoundException,原因是它向上游TaskManager发送partition
> request请求,上游TaskManager的netty server接收到partition request后发现它请求的上游计算任务没有部署成功。
> >
> 所以从这个异常错误来看netty连接是通的,你可能需要根据输出PartitionNotFoundException信息的计算任务,查一下它的上游计算任务为什么没有部署成功
> >
> > On Tue, Sep 27, 2022 at 10:20 PM yidan zhao  wrote:
> >>
> >> 补充:flink1.15.2版本,standalone集群,基于zk的ha。
> >> 环境是公司自研容器环境。3个容器启JM+HistoryServer。剩下几百个容器都是TM。每个TM提供1个slot。
> >>
> >> yidan zhao  于2022年9月27日周二 22:07写道:
> >> >
> >> > 此外,今天还做了个尝试,貌似和长时间没重启TM有关?重启后频率低很多会。
> >> > 我预留的TM很多,比如500个TM,每个TM就提供1个slot,任务可能只用100个TM。
> >> > 会不会剩下400的TM的连接,时间厂了就会出现某种问题?
> >> >
> >> > yidan zhao  于2022年9月27日周二 16:21写道:
> >> > >
> >> > > 打开了TM的debug日志后发现很多这种日志:
> >> > > Responding with error: class
> >> > > org.apache.flink.runtime.io
> .network.partition.PartitionNotFoundException
> >> > >
> >> > > 目前问题的直观表现是:提交任务后,一直报 LocalTransportException:
> >> > > org.apache.flink.runtime.io
> .network.netty.exception.LocalTransportException:
> >> > > Sending the partition request to '/10.216.187.171:8709 (#0)'
> failed.
> >> > > at org.apache.flink.runtime.io
> .network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:145)
> >> > > at org.apache.flink.runtime.io
> .network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:133)
> >> > > at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
> >> > > at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
> >> > > at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
> >> > > at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
> >> > > at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)
> >> > > at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
> >> > > at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:1017)
> >> > > at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:878)
> >> > > at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
> >> > > at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
> >> > > at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
> >> > > at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
> >> > > at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
> >> > > at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
> >> > > at
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
> >> > > at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
> >> > > at
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> >> > > at java.lang.Thread.run(Thread.java:748)
> >> > > Caused by:
> org.apache.flink.shaded.netty4.io.netty.channel.StacklessClosedChannelException
> >> > > at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object,
> >> > > ChannelPromise)(Unknown Source)
> >> > >
> >> > > 不清楚和debug的那个日志是否有关呢?
> >> > >
> >> > > 然后都是什么原因呢这个问题,之前一直怀疑是网络原因,一直也不知道啥原因。今天开了debug才发现有这么个debug报错。
>


Re: Flink任务异常停止

2022-09-27 文章 Lijie Wang
建议 dump 下 TM 内存看下具体内存使用情况

Best,
Lijie

lxk  于2022年9月28日周三 09:46写道:

> 最近Flink任务运行一段时间后就会自动停止。从JM和TM能看到的有效信息只有下面这段:
>
> 2022-09-24 07:18:16,303 INFO
> org.apache.flink.yarn.YarnTaskExecutorRunner [] - RECEIVED
> SIGNAL 15: SIGTERM. Shutting down as requested.
> 2022-09-24 07:18:16,323 INFO
> org.apache.flink.runtime.blob.PermanentBlobCache [] - Shutting
> down BLOB cache
> 2022-09-24 07:18:16,325 INFO
> org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager [] -
> Shutting down TaskExecutorLocalStateStoresManager.
>
> 看了一下官方的相关issue,好像是说堆内存不足导致的。
> 我记录下了GC日志,同时通过GC easy看了下,内存一直在增长,没有释放。
> 代码中只是对两个流join,同时设置的TTL为15秒,应该不会占据这么大的内存
> 想请教下各位有什么思路
>


Re: Flink 镜像同一版本不同Tag的差异

2022-08-15 文章 Lijie Wang
Hi,
是的,scala和Java表示的是容器中 scala 和 java 的版本。

Best,
Lijie

hjw <1010445...@qq.com.invalid> 于2022年8月15日周一 15:00写道:

> 我注意到Flink docker 镜像在同一版本有不同的Tag。
> 以1.15.1为例。Tag有scala_2.12-java8、scala_2.12-java11、java8等等。
> scala和Java表示的是容器启动Flink进程的时候用的java版本是吗?
> 另外,我用jdk11构建Flink源码放到Tag为java8的镜像会出现不兼容的情况。
> thx
>  


Re: 如何在flink中正确使用外部数据库连接

2022-07-24 文章 Lijie Wang
Hi,
根据我的经验,使用连接池时,至少需要及时关掉 statement/ResultSet,否则查询的结果会一直缓存,会有内存泄漏的问题。

Best,
Lijie

lxk7...@163.com  于2022年7月23日周六 15:34写道:

>
> 目前的项目中,需要使用外部数据库进行实时的look up。实时的主流数据量一天在百万级别,能用到的外部数据库有Mysql,Clickhouse
> 以及Redis缓存。
> 现在是将数据实时落到Clickhouse,然后Flink实时的去look up
> clickhouse。(虽然知道Clickhouse并发性能不强,但目前能用的就只有这个了,需要存储千万级别的数据)
> 测试了两种方式:
>
> 1.使用JDBC连接池的方式去查询,Druid连接池以及C3P0连接池都用过,但是程序都是运行一段时间就会报OOM(有可能是使用方式不对)。通过dump日志排查的时候发现连接池的方式会将很多信息保留下来,所以最终没有使用这种方式。同时的话,在flink内部使用连接池的时候也没有显示的关闭连接。只在Close方法中调用了关闭。
> 2.使用DriverManager获取连接查询。这种方式目前测试下来,程序是稳定运行的,也没有报OOM。同时也没有去关闭连接。
>
> 问题:1.如何正确在flink内部使用外部数据库连接,使用数据池的方式,个人理解连接的管理都是由数据池来做的,所以不需要去显示close。同时的话,个人认为实时的程序去查,这个连接就会一直占用着,也无需关闭。简言之,无论是数据池还是直连,需不需要在invoke方法中关闭连接?还是只用在close方法中关闭连接。
>   2.这种实时的look up除了缓存之外还有没有其他更好的优化手段?或者有什么其他的方案可以替代?
>
>
> lxk7...@163.com
>


Re: 请教下flink源码分支和tag的命名

2022-07-20 文章 Lijie Wang
Hi,
1.15.1 应该是对应 tag release-1.15.1

yidan zhao  于2022年7月21日周四 12:53写道:

> 我目前看了下,有一定规律但也还是不完全懂。
> 比如我目前有部分公司内部用到的,希望基于1.15.1的release上加的话,我需要基于哪个分支?还是tag做更改呢?
> 哪个branch、or tag是对应官方download页面提供的下载链接的包中一模一样的源码呢,就是不包含新增开发但未发布代码的版本。
>


Re: Re: flink-hudi-hive

2022-07-12 文章 Lijie Wang
Hi,

jstack 下 JM 的栈? 从目前这个现象上看,像是 JobMaster 初始化的时候卡住了

Best,
Lijie

ynz...@163.com  于2022年7月13日周三 09:56写道:

> 是的,192.168.10.227:35961是TM地址;
> 反复初始化是指,在flink web ui的overview界面,Running Job
> List中对应JOb的status一直是INITIALIZING;
> 没有TM日志,我暂时还没弄明白为什么退出,flink web ui的TM界面,全程是没有任何信息的;
> 以下是日志列表,我没找到啥有用信息
> directory.info : Total file length is 7201 bytes.
> jobmanager.err : Total file length is 588 bytes.
> jobmanager.log : Total file length is 82894 bytes.
> jobmanager.out : Total file length is 0 bytes.
> launch_container.sh : Total file length is 21758 bytes.
> prelaunch.err : Total file length is 0 bytes.
> prelaunch.out : Total file length is 100 bytes.
>
>
>
> best,
> ynz...@163.com
>
> From: Weihua Hu
> Date: 2022-07-12 23:18
> To: user-zh
> Subject: Re: Re: flink-hudi-hive
> 单从这个日志看不到一直 Failover ,相关任务反复初始化是指哪个任务呢?
> 看到了一些 akka 的链接异常,有可能是对应的 TM 异常退出了,可以再确认下 192.168.10.227:35961 这个是不是
> TaskManager 地址,以及为什么退出
>
> Best,
> Weihua
>
>
> On Tue, Jul 12, 2022 at 9:37 AM ynz...@163.com  wrote:
>
> > 这是job managers所有日志:
> > 2022-07-12 09:33:02,280 INFO
> > org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> > configuration property: execution.shutdown-on-attached-exit, false
> > 2022-07-12 09:33:02,280 INFO
> > org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> > configuration property: pipeline.jars,
> > file:/home/dataxc/opt/flink-1.14.4/opt/flink-python_2.11-1.14.4.jar
> > 2022-07-12 09:33:02,280 INFO
> > org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> > configuration property: execution.checkpointing.min-pause, 8min
> > 2022-07-12 09:33:02,280 INFO
> > org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> > configuration property: restart-strategy, failure-rate
> > 2022-07-12 09:33:02,280 INFO
> > org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> > configuration property: jobmanager.memory.jvm-metaspace.size, 128m
> > 2022-07-12 09:33:02,280 INFO
> > org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> > configuration property: state.checkpoints.dir, hdfs:///flink/checkpoints
> > 2022-07-12 09:33:02,382 WARN  akka.remote.transport.netty.NettyTransport
> >  [] - Remote connection to [null] failed with
> > java.net.ConnectException: Connection refused: n103/192.168.10.227:35961
> > 2022-07-12 09:33:02,383 WARN  akka.remote.ReliableDeliverySupervisor
> >  [] - Association with remote system
> [akka.tcp://flink@n103:35961]
> > has failed, address is now gated for [50] ms. Reason: [Association failed
> > with [akka.tcp://flink@n103:35961]] Caused by:
> > [java.net.ConnectException: Connection refused: n103/
> 192.168.10.227:35961]
> > 2022-07-12 09:33:02,399 INFO
> > org.apache.flink.runtime.rpc.akka.AkkaRpcService [] -
> Starting
> > RPC endpoint for
> > org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager at
> > akka://flink/user/rpc/resourcemanager_1 .
> > 2022-07-12 09:33:02,405 INFO
> > org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager []
> -
> > Starting the resource manager.
> > 2022-07-12 09:33:02,479 INFO
> > org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider [] -
> > Failing over to rm2
> > 2022-07-12 09:33:02,509 INFO
> > org.apache.flink.yarn.YarnResourceManagerDriver  [] -
> Recovered
> > 0 containers from previous attempts ([]).
> > 2022-07-12 09:33:02,509 INFO
> > org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager []
> -
> > Recovered 0 workers from previous attempt.
> > 2022-07-12 09:33:02,514 WARN  akka.remote.transport.netty.NettyTransport
> >  [] - Remote connection to [null] failed with
> > java.net.ConnectException: Connection refused: n103/192.168.10.227:35961
> > 2022-07-12 09:33:02,515 WARN  akka.remote.ReliableDeliverySupervisor
> >  [] - Association with remote system
> [akka.tcp://flink@n103:35961]
> > has failed, address is now gated for [50] ms. Reason: [Association failed
> > with [akka.tcp://flink@n103:35961]] Caused by:
> > [java.net.ConnectException: Connection refused: n103/
> 192.168.10.227:35961]
> > 2022-07-12 09:33:02,528 INFO  org.apache.hadoop.conf.Configuration
> >  [] - resource-types.xml not found
> > 2022-07-12 09:33:02,528 INFO
> > org.apache.hadoop.yarn.util.resource.ResourceUtils   [] - Unable
> to
> > find 'resource-types.xml'.
> > 2022-07-12 09:33:02,538 INFO
> > org.apache.flink.runtime.externalresource.ExternalResourceUtils [] -
> > Enabled external resources: []
> > 2022-07-12 09:33:02,541 INFO
> > org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl [] - Upper
> > bound of the thread pool size is 500
> > 2022-07-12 09:33:02,584 WARN  akka.remote.transport.netty.NettyTransport
> >  [] - Remote connection to [null] failed with
> > java.net.ConnectException: Connection refused: n103/192.168.10.227:35961
> > 2022-07-12 09

Re: on k8s 部署taskmanager一直不能启动

2022-07-11 文章 Lijie Wang
看一下 TM pods 是否启动了?TM log 中是否有异常?看起来是 TM 一直没有注册上来

Best,
Lijie

陈卓宇 <2572805...@qq.com.invalid> 于2022年7月12日周二 10:53写道:

> flink:1.14.5
> on k8s 部署taskmanager一直不能启动,也没有日志
> jobmanager日志:
> 2022-07-12 02:08:22,271 INFO 
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] -
> Creating new TaskManager pod with name iii5-taskmanager-1-1 and resource
> <1728,1.0>.
> 2022-07-12 02:08:22,286 WARN 
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AdminClientConfig
> [] - The configuration 'key.deserializer' was supplied but isn't a known
> config.
> 2022-07-12 02:08:22,286 WARN 
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AdminClientConfig
> [] - The configuration 'value.deserializer' was supplied but isn't a known
> config.
> 2022-07-12 02:08:22,286 WARN 
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AdminClientConfig
> [] - The configuration 'enable.auto.commit' was supplied but isn't a known
> config.
> 2022-07-12 02:08:22,287 WARN 
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AdminClientConfig
> [] - The configuration 'group.id' was supplied but isn't a known config.
> 2022-07-12 02:08:22,287 WARN 
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AdminClientConfig
> [] - The configuration 'client.id.prefix' was supplied but isn't a known
> config.
> 2022-07-12 02:08:22,287 WARN 
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AdminClientConfig
> [] - The configuration 'partition.discovery.interval.ms' was supplied but
> isn't a known config.
> 2022-07-12 02:08:22,287 WARN 
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AdminClientConfig
> [] - The configuration 'auto.offset.reset' was supplied but isn't a known
> config.
> 2022-07-12 02:08:22,287 INFO 
> org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser
> [] - Kafka version: unknown
> 2022-07-12 02:08:22,287 INFO 
> org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser
> [] - Kafka commitId: unknown
> 2022-07-12 02:08:22,287 INFO 
> org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser
> [] - Kafka startTimeMs: 1657591702287
> 2022-07-12 02:08:22,354 INFO 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator []
> - Starting the KafkaSourceEnumerator for consumer group
> hire_sign_contract_prod without periodic partition discovery.
> 2022-07-12 02:08:23,464 INFO 
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Pod
> iii5-taskmanager-1-1 is created.
> 2022-07-12 02:08:23,467 INFO 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator []
> - Discovered new partitions: [canal_hire_sign_v2-11, canal_hire_sign_v2-9,
> canal_hire_sign_v2-10, canal_hire_sign_v2-0, canal_hire_sign_v2-3,
> canal_hire_sign_v2-4, canal_hire_sign_v2-1, canal_hire_sign_v2-2,
> canal_hire_sign_v2-7, canal_hire_sign_v2-8, canal_hire_sign_v2-5,
> canal_hire_sign_v2-6]
> 2022-07-12 02:08:23,576 INFO 
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] -
> Received new TaskManager pod: iii5-taskmanager-1-1
> 2022-07-12 02:08:23,578 INFO 
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Requested worker iii5-taskmanager-1-1 with resource spec WorkerResourceSpec
> {cpuCores=1.0, taskHeapSize=384.000mb (402653174 bytes), taskOffHeapSize=0
> bytes, networkMemSize=128.000mb (134217730 bytes), managedMemSize=512.000mb
> (536870920 bytes), numSlots=1}.
>
> 到这里就卡主了
> 然后过一段时间,会报slot分配的异常,但是机器的资源是够的,之前也是能启动的


Re: 请教下flink的提交方式

2022-07-04 文章 Lijie Wang
Hi,
拿不到任务 id 是指 Flink job id 么?
另外你的部署方式是什么样子的? 如果是 session/perjob 这种在 client 端编译 job graph,你可以在 main 方法中打印
job id 的

Best,
Lijie

sherlock zw  于2022年7月4日周一 17:51写道:

> 目前我需要去监控已经提交的flink任务,
> 但是通过命令行方式提交的话拿不到任务id,只能通过INFO级别的日志过滤出来,但是我们的环境里面的日志界别是WARN,看不到任务id的日志输出,所以想问下除了命令行的方式提交任务还有其他方式吗,例如有和Spark类似的SparkLaunch一样的jar提交的方式吗?希望大佬指点下,谢谢。
>
>
>


Re: flink 关于Managed Memory疑惑

2022-06-30 文章 Lijie Wang
Hi,
补充一句,所以在 Flink Web UI 上显示 managed memory 一直都会是满的

Best,
Lijie

Lijie Wang  于2022年6月30日周四 17:56写道:

> Hi,
> 这个是正常的,Flink 应该是拿不到 RocksDBStateBackend 实际内存使用的情况的,你可以理解为 managed memory
> 在一开始就被 StateBackend 划走了。
>
> Best,
> Lijie
>
> 陈卓宇 <2572805...@qq.com.invalid> 于2022年6月30日周四 17:35写道:
>
>> flink版本:1.13.1
>> 状态后端:RocksDBStateBackend
>> 问题描述:
>> flink启动后我就发现webui界面的TaskManager上的Managed Memory是用满的,随即我将512M调整到2G依旧打满
>> 我后面又将2G改为2kb发现程序依然运行,没有报错,我理解内存是不可压榨资源,显示用满应该会报错才是正常
>> 不知道是什么原因,满了也不报错,调大还是满
>
>


Re: flink 关于Managed Memory疑惑

2022-06-30 文章 Lijie Wang
Hi,
这个是正常的,Flink 应该是拿不到 RocksDBStateBackend 实际内存使用的情况的,你可以理解为 managed memory
在一开始就被 StateBackend 划走了。

Best,
Lijie

陈卓宇 <2572805...@qq.com.invalid> 于2022年6月30日周四 17:35写道:

> flink版本:1.13.1
> 状态后端:RocksDBStateBackend
> 问题描述:
> flink启动后我就发现webui界面的TaskManager上的Managed Memory是用满的,随即我将512M调整到2G依旧打满
> 我后面又将2G改为2kb发现程序依然运行,没有报错,我理解内存是不可压榨资源,显示用满应该会报错才是正常
> 不知道是什么原因,满了也不报错,调大还是满


Re: Flink k8s 作业提交流程

2022-06-27 文章 Lijie Wang
Hi,

使用文档可以查看:
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes
设计文档可以查看:
https://docs.google.com/document/d/1-jNzqGF6NfZuwVaFICoFQ5HFFXzF5NVIagUZByFMfBY/edit?usp=sharing
jira: https://issues.apache.org/jira/browse/FLINK-9953

Best,
Lijie

hjw <1010445...@qq.com.invalid> 于2022年6月28日周二 00:11写道:

> Flink version:1.15.0
> 请问在1.15.0版本Flink在native k8s作业提交流程是如何实现的?亦或者说Flink on Native k8s
> 是如何设计的,我想了解学习,如果大家有相关文档资料,麻烦告知,感谢:)
>  


Re: 任务 cancel 失败,个别 task 一直处于 CANCELING 状态

2022-06-27 文章 Lijie Wang
Hi,

1. 建议贴下完整的 TM 日志和 jstack
2. 可以看下 GC 日志,看下 GC 是否正常

Best,
Lijie

李辉  于2022年6月27日周一 15:46写道:

> 求助:如题,Flink 版本 1.13.2,作业部署在 k8s
>
> 1、概览:
>
>
> 2、被 hang 住的TM 日志,之后没有其他日志了,也没有异常:
>
>
>
> 3、jstack 分析,没有发现 Block 状态的线程
>
>
>


Re: 线上flink任务突然出现连续的checkpoint失败

2022-06-23 文章 Lijie Wang
-> Caused by: org.apache.flink.util.SerializedThrowable: Unable to close
file because the last
blockBP-1965840142-10.216.138.23-1585685654447:blk_2926076096_1852445656
does not have enough number of replicas.

从错误看是写 hdfs 的问题,建议看下 hdfs 是否正常

Best,
Lijie

陈卓宇 <2572805...@qq.com.invalid> 于2022年6月24日周五 12:00写道:

> flink版本:1.13.1
> hdfs:3+版本
> 异常日志:
>
> 2022-06-24 10:58:19,839 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Decline
> checkpoint 1101 by task b3d88f9ef72bda003056856c4422742d of job
> 6bd7dc46451f01e008762c9b556cb08f at zhaohy4-test-taskmanager-1-1 @
> 10.42.5.55 (dataPort=40558).
>
> org.apache.flink.util.SerializedThrowable: Asynchronous task checkpoint
> failed.
>
> at
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:279)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>
> at
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:175)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_202]
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_202]
>
> at java.lang.Thread.run(Thread.java:748)
> [?:1.8.0_202]
>
> Caused by: org.apache.flink.util.SerializedThrowable: Could not
> materialize checkpoint 1101 for operator IntervalJoin(joinType=[InnerJoin],
> windowBounds=[isRowTime=true, leftLowerBound=-129600,
> leftUpperBound=129600, leftTimeIndex=4, rightTimeIndex=4],
> where=[((hire_contract_id = id) AND (last_modify_time >=
> (last_modify_time0 - 129600:INTERVAL DAY)) AND (last_modify_time <=
> (last_modify_time0 + 129600:INTERVAL DAY)))], select=[hire_contract_id,
> hire_status_code, sign_date, confirm_date, last_modify_time, proctime, id,
> hire_contract_code, ziroom_version_id, is_del, last_modify_time0]) ->
> Calc(select=[hire_contract_id, hire_status_code, sign_date, confirm_date,
> last_modify_time, proctime, hire_contract_code, ziroom_version_id, is_del
> AS is_del0, last_modify_time0]) (1/1)#3.
>
> at
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:257)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>
> ... 4 more
>
> Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException:
> Could not flush to file and close the file system output stream to
> hdfs://zrHdfsHa/user/flink/checkpointsdata/6bd7dc46451f01e008762c9b556cb08f/shared/5a5118ba-427f-4234-8e36-ec8d24418fe4
> in order to obtain the stream state handle
>
> at
> java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:1.8.0_202]
>
> at
> java.util.concurrent.FutureTask.get(FutureTask.java:192) ~[?:1.8.0_202]
>
> at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:636)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>
> at
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer. ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>
> at
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:128)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>
> ... 3 more
>
> Caused by: org.apache.flink.util.SerializedThrowable: Could not flush to
> file and close the file system output stream to
> hdfs://zrHdfsHa/user/flink/checkpointsdata/6bd7dc46451f01e008762c9b556cb08f/shared/5a5118ba-427f-4234-8e36-ec8d24418fe4
> in order to obtain the stream state handle
>
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:373)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:143)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:101)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>
> at
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:32)
> ~[flink-core-1.13.1.jar:1.13.1]
>
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> ~[?:1.8.0_202]
>
> ... 3 more
>
> Caused by: org.apache.flink.util.SerializedThrowable: Unable to close file
> because the last
> blockBP-1965840142-10.216.138.23-1585685654447:blk_2926076096_1852445656
> does not have enough number of replicas.
>
> at
> org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:966)
> ~[flink-shaded-hadoop-3-uber-3.1.1.7.2.1.0-327-9.0.jar:3.1.1.7.2.1.0-327-9.0]
>
> at
> org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:909)
> ~[flink-shaded-hadoop-3-uber-3.1.1.7.2.1.0-327-9.0.jar:3.1.1.7.2.1.0-327-9.0]
>
> at
> org.apache.hadoop.hdfs.DFSOutputStream.closeI

Re: flink 1.10.1 flinkui 取消任务 任务一直处于cancalling中 很长时间才取消掉

2022-06-20 文章 Lijie Wang
Hi,方便的话,可以截一下一直处于 canceling 状态的 task 所在 TM 的栈,这样可以看出 task 线程卡在哪里了

Best,
Lijie

沈保源 <757434...@qq.com.invalid> 于2022年6月17日周五 16:47写道:

> JobManager 的日志并没有什么东西,Task也没有使用udf,只是最简单的cep
> 模式组  目前出现这种问题是在cep中,如果在这段时间内出现大量符合模式1
> 但是没有模式2的日志,当关闭时会出现这种情况,同时cep api 在这种情况下水位线延迟很高  是这个问题引起的吗?
> task出现了一个日志是Discovered coordinator :9092 (id: 2147483647 rack: null)
> for group merge_341
>
>
>
>
> -- 原始邮件 --
> 发件人: "Weihua Hu" 发送时间: 2022年6月16日(星期四) 晚上9:10
> 收件人: "user-zh" 主题: Re: flink 1.10.1 flinkui 取消任务 任务一直处于cancalling中 很长时间才取消掉
>
>
>
> Hi,
> 建议看一下 JobManager 的日志,检查下再 Canceling 时作业是在什么状态。也检查下 Task 是否使用 UDF ,在 UDF
> close 时是否有耗时的操作。
>
> Best,
> Weihua
>
>
> On Thu, Jun 16, 2022 at 3:11 PM 沈保源 <757434...@qq.com.invalid> wrote:
>
> > flink 1.10.1 flinkui 取消任务  任务一直处于cancalling中 很长时间才取消掉


Re: 退订

2022-06-08 文章 Lijie Wang
Hi,退订请发送至邮箱 user-zh-unsubscr...@flink.apache.org,而不是
user-zh@flink.apache.org 

Best,
Lijie

黎永康  于2022年6月9日周四 09:56写道:

> 退订


Re: flink运行一段时间后TaskManager退出,报OutOfMemoryError: Metaspace

2022-06-08 文章 Lijie Wang
看错误是 metaspace OOM 了,可以按照提示,增加 taskmanager.memory.jvm-metaspace.size
的大小,或者增加 TM 总内存大小

Best,
Lijie

weishishuo...@163.com  于2022年6月7日周二 18:37写道:

> 我使用的版本是:
> flink:1.13.2
> flink cdc: flink-connector-jdbc_2.11-1.13.2.jar
> flink-sql-connector-mysql-cdc-2.2.0.jar
> flink-sql-connector-postgres-cdc-2.2.0.jar
>
> 任务比较简单,就是从mysql、pg同步数据到pg,mysql,使用的是sql接口,请问大伙儿有碰到过这个问题吗?
>
> 2022-06-07 18:13:59,393 ERROR
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner  [] - Fatal
> error occurred while executing the TaskManager. Shutting it down...
> java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory error
> has occurred. This can mean two things: either the job requires a larger
> size of JVM metaspace to load classes or there is a class loading leak. In
> the first case 'taskmanager.memory.jvm-metaspace.size' configuration option
> should be increased. If the error persists (usually in cluster after
> several job (re-)submissions) then there is probably a class loading leak
> in user code or some of its dependencies which has to be investigated and
> fixed. The task executor has to be shutdown...at
> java.lang.ClassLoader.defineClass1(Native Method) ~[?:1.8.0_112]
> at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
> ~[?:1.8.0_112]
> at
> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
> ~[?:1.8.0_112]
> at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
> ~[?:1.8.0_112]
> at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
> ~[?:1.8.0_112]
> at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
> ~[?:1.8.0_112]
> at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
> ~[?:1.8.0_112]
> at java.security.AccessController.doPrivileged(Native Method)
> ~[?:1.8.0_112]
> at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
> ~[?:1.8.0_112]
> at
> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:71)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
> [flink-dist_2.11-1.13.2.jar:1.13.2]
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) [?:1.8.0_112]
> at io.debezium.relational.Column.editor(Column.java:31)
> [blob_p-343be43b7874de49f6ce1d8bcb6a90a384203530-2e5afb6f8bf4834164c1bb92aaf97a00:2.2.0]
> at
> io.debezium.connector.postgresql.connection.PostgresConnection.readTableColumn(PostgresConnection.java:464)
> [blob_p-343be43b7874de49f6ce1d8bcb6a90a384203530-2e5afb6f8bf4834164c1bb92aaf97a00:2.2.0]
> at
> io.debezium.jdbc.JdbcConnection.getColumnsDetails(JdbcConnection.java:1226)
> [blob_p-343be43b7874de49f6ce1d8bcb6a90a384203530-2e5afb6f8bf4834164c1bb92aaf97a00:2.2.0]
> at
> io.debezium.jdbc.JdbcConnection.readSchema(JdbcConnection.java:1182)
> [blob_p-343be43b7874de49f6ce1d8bcb6a90a384203530-2e5afb6f8bf4834164c1bb92aaf97a00:2.2.0]
> at
> io.debezium.connector.postgresql.PostgresSchema.refresh(PostgresSchema.java:100)
> [blob_p-343be43b7874de49f6ce1d8bcb6a90a384203530-2e5afb6f8bf4834164c1bb92aaf97a00:2.2.0]
> at
> io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource.connectionCreated(PostgresSnapshotChangeEventSource.java:95)
> [blob_p-343be43b7874de49f6ce1d8bcb6a90a384203530-2e5afb6f8bf4834164c1bb92aaf97a00:2.2.0]
> at
> io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:103)
> [blob_p-343be43b7874de49f6ce1d8bcb6a90a384203530-2e5afb6f8bf4834164c1bb92aaf97a00:2.2.0]
>
>
>
>
> weishishuo...@163.com
>


Re: [ANNOUNCE] Apache Flink Kubernetes Operator 1.0.0 released

2022-06-08 文章 Lijie Wang
Congrats! Thanks Yang for driving the release, and thanks to all
contributors!

Best,
Lijie

John Gerassimou  于2022年6月6日周一 22:38写道:

> Thank you for all your efforts!
>
> Thanks
> John
>
> On Sun, Jun 5, 2022 at 10:33 PM Aitozi  wrote:
>
>> Thanks Yang and Nice to see it happen.
>>
>> Best,
>> Aitozi.
>>
>> Yang Wang  于2022年6月5日周日 16:14写道:
>>
>>> The Apache Flink community is very happy to announce the release of
>>> Apache Flink Kubernetes Operator 1.0.0.
>>>
>>> The Flink Kubernetes Operator allows users to manage their Apache Flink
>>> applications and their lifecycle through native k8s tooling like kubectl.
>>> This is the first production ready release and brings numerous
>>> improvements and new features to almost every aspect of the operator.
>>>
>>> Please check out the release blog post for an overview of the release:
>>>
>>> https://flink.apache.org/news/2022/06/05/release-kubernetes-operator-1.0.0.html
>>>
>>> The release is available for download at:
>>> https://flink.apache.org/downloads.html
>>>
>>> Maven artifacts for Flink Kubernetes Operator can be found at:
>>>
>>> https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator
>>>
>>> Official Docker image for Flink Kubernetes Operator applications can be
>>> found at:
>>> https://hub.docker.com/r/apache/flink-kubernetes-operator
>>>
>>> The full release notes are available in Jira:
>>>
>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12351500
>>>
>>> We would like to thank all contributors of the Apache Flink community
>>> who made this release possible!
>>>
>>> Regards,
>>> Gyula & Yang
>>>
>>


Re: [ANNOUNCE] Open source of remote shuffle project for Flink batch data processing

2021-12-05 文章 Lijie Wang
As one of the contributors of flink remote shuffle, I'm glad to hear all
the warm responses! Welcome more people to try the flink remote shuffle and
look forward to your feedback.

Best,
Lijie

Yingjie Cao  于2021年12月1日周三 17:50写道:

> Hi Jiangang,
>
> Great to hear that, welcome to work together to make the project better.
>
> Best,
> Yingjie
>
> 刘建刚  于2021年12月1日周三 下午3:27写道:
>
>> Good work for flink's batch processing!
>> Remote shuffle service can resolve the container lost problem and reduce
>> the running time for batch jobs once failover. We have investigated the
>> component a lot and welcome Flink's native solution. We will try it and
>> help improve it.
>>
>> Thanks,
>> Liu Jiangang
>>
>> Yingjie Cao  于2021年11月30日周二 下午9:33写道:
>>
>> > Hi dev & users,
>> >
>> > We are happy to announce the open source of remote shuffle project [1]
>> for
>> > Flink. The project is originated in Alibaba and the main motivation is
>> to
>> > improve batch data processing for both performance & stability and
>> further
>> > embrace cloud native. For more features about the project, please refer
>> to
>> > [1].
>> >
>> > Before going open source, the project has been used widely in production
>> > and it behaves well on both stability and performance. We hope you enjoy
>> > it. Collaborations and feedbacks are highly appreciated.
>> >
>> > Best,
>> > Yingjie on behalf of all contributors
>> >
>> > [1] https://github.com/flink-extended/flink-remote-shuffle
>> >
>>
>


回复:转发:flink1.10整合hbase测试遇到的问题

2020-06-03 文章 Lijie Wang
这个是因为这个 class 不在路径中导致的。你需要确认一下这个 class 在哪个 jar 包中,这个 jar 包是否在 flink 的 lib 下




在2020年06月3日 22:52,liunaihua521 写道:





- 转发邮件信息 -

发件人: liunaihua521 
发送日期: 2020年6月3日 22:18
发送至: user-zh-i...@flink.apache.org  、 
user-zh-...@flink.apache.org 
主题: flink1.10整合hbase测试遇到的问题
hi!
版本说明:
flink版本1.10
HBase版本2.2.4
ZK版本3.6.1
Hadoop版本2.10.0


程序说明:

程序是简单的实现RichSourceFunction和RichSinkFunction,读取和写入hbase,程序打包后上传standalone模式的集群.


报错说明:
提交任务后,总是报如下错误(附件附文本):

或者



尝试如下:
尝试一:
flink的lib下有如下jar包:
提交的jar包中发现没有下面连个类
执行后报错


尝试二:
将guava-11.0.2.jar包移动到hadoop的lib下,再次执行依然报错


尝试结果:
反复尝试都一致报错,求大神们指点,再此先谢谢了!


回复: flink 如何自定义connector

2020-05-27 文章 Lijie Wang
可能的确会存在这个问题。在添加依赖时注意不要将 flink provided 的包打包进去就可以。也可以通过设置 parent-first 来解决这个问题。




在2020年05月28日 11:03,forideal 写道:
Hi 111,

关于第二点:
`2 配置FLINK_HOME环境变量,自定义的connector放在FLINK_HOME/lib下`
这么做是不是存在如下 `X can't be cast to X ` 隐患
因为把 Connector 放在 lib 中,会有 classloader 的问题,直接的现象就是 X can't be cast to X 
问题[1]。当然这只是说可能会发生。比如,我们把 usercode 代码放入 Flink lib 我们会发现,当我们使用 jar 
包上传的方式运行任务时,jar 中也包含 lib 中的代码会触发这样的问题。
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/debugging_classloading.html#x-cannot-be-cast-to-x-exceptions


Best
forideal








在 2020-05-28 10:16:45,"111"  写道:
Hi,
想要在sqlgateway里面使用,那么可以看看下面几个条件:
1 满足SPI的要求,能让flink自动发现实现类
2 配置FLINK_HOME环境变量,自定义的connector放在FLINK_HOME/lib下
3 如果与Hive集成,使用hivecatalog,那么先要注册表
这样就可以使用了。
Best,
Xinghalo


??????flink????????????????????instance??,StandaloneSessionClusterEntrypoint????????kill

2020-05-26 文章 Lijie Wang
jps ?? kill 




??2020??05??27?? 13:55??smq<374060...@qq.com> ??
start-cluster.sh??
[root@node01 bin]# stop-cluster.sh
No taskexecutor daemon (pid: 11978) is running anymore on node01.
No taskexecutor daemon (pid: 5885) is running anymore on node02.
No taskexecutor daemon (pid: 4529) is running anymore on node03.
No standalonesession daemon (pid: 11633) is running anymore on node01.
No standalonesession daemon (pid: 5554) is running anymore on node02.
start,stop??





--  --
??????: "Lijie Wang"

??????flink????????????????????instance??,StandaloneSessionClusterEntrypoint????????kill

2020-05-26 文章 Lijie Wang
 cluster ?? 
start-cluster.sh??cluster stop-cluster.sh?? ?? 
start-cluster.sh




??2020??05??27?? 13:39??smq<374060...@qq.com> ??
hi
??.??



[root@node01 bin]# start-cluster.sh
Starting HA cluster with 2 masters.
[INFO] 5 instance(s) of standalonesession are already running on node01.
Starting standalonesession daemon on host node01.
[INFO] 1 instance(s) of standalonesession are already running on node02.
Starting standalonesession daemon on host node02.
[INFO] 1 instance(s) of taskexecutor are already running on node01.
Starting taskexecutor daemon on host node01.
[INFO] 3 instance(s) of taskexecutor are already running on node02.
Starting taskexecutor daemon on host node02.
[INFO] 6 instance(s) of taskexecutor are already running on node03.
Starting taskexecutor daemon on host node03.

回复:flink 1.10webui不显示print内容

2020-05-26 文章 Lijie Wang
这个是不需要配置并且所有版本都支持的,你可以看一下 taskmanager.out 的输出内容。 此外,你需要确认一下你 print 的逻辑是否属于在 TM 
端执行,有可能是在 client 端被执行的。




在2020年05月26日 21:39,smq<374060...@qq.com> 写道:
Hi
我的代码中打印的结果不能在webui上stdout看到,但是网上看的博客有人是可以显示打印内容的,只不过不是1.10版本。
请问是配置的问题还是这个版本不支持呢

Re:Could not find a suitable table factory for 'TableSourceFactory'

2020-05-24 文章 Lijie Wang
Hi,我不能加载你邮件中的图片。从下面的报错看起来是因为找不到 match 的connector。可以检查一下 DDL 中的 with 属性是否正确。



在 2020-05-25 00:11:16,"macia kk"  写道:

有人帮我看下这个问题吗,谢谢






org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: findAndCreateTableSource failed.
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
not find a suitable table factory for 
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.
Reason: Required context properties mismatch.

The matching candidates:
org.apache.flink.table.sources.CsvAppendTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'
'format.type' expects 'csv', but is 'json'

The following properties are requested:
connector.properties.bootstrap.servers=ip-10-128-145-1.idata-server.shopee.io:9092connector.properties.group.id=keystats_aripay
connector.property-version=1
connector.startup-mode=latest-offset
connector.topic=ebisu_wallet_id_db_mirror_v1
connector.type=kafka
format.property-version=1
format.type=json
schema.0.data-type=INT
schema.0.name=ts
schema.1.data-type=VARCHAR(2147483647)
schema.1.name=table
schema.2.data-type=VARCHAR(2147483647)
schema.2.name=database
update-mode=append

The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
at 
org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
at 
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
at 
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
at 
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52)
... 39 more