Re: Re: Re: idea导入flink源码问题(看不到图片,贴了下文字)
Hi penguin, 虽然你配置了多个mirror,但实际起作用的是第一个mirror。第一个mirror针对大部分情况够用了,我这边在阿里云仓库可以正常下载json-smart:2.3,你这边没有贴出来pom.xml,没有, 所以我这边没有一一尝试。但无非问题主要是以下方面排查: 1、你要确定一下,你是不是用的这个settings.xml; 2、针对特定的不能从阿里云仓库下载的jar包,可以在pom.xml中配置 和 ,以 WSO2 Maven2 Repository仓库为例,配置如下: wso2-maven2-repository WSO2 Maven2 Repository https://maven.wso2.org/nexus/content/repositories/releases/ wso2-maven2-repository WSO2 Maven2 Repository https://maven.wso2.org/nexus/content/repositories/releases/ 祝好 penguin. 于2021年1月13日周三 下午4:09写道: > pom文件就是源码里面的那个 > > > setting.xml: > > > > > > > > > alimaven > > aliyun maven > > http://maven.aliyun.com/nexus/content/groups/public/ > > central > > > > > > mapr-public > > mapr-releases > > mapr-releases > > https://maven.aliyun.com/repository/mapr-public > > > > > > ali maven > > aliyun maven > > https://maven.aliyun.com/repository/public/ > > central > > > > > > ui > > central > > Human Readable Name for this Mirror. > > http://uk.maven.org/maven2/ > > > > > > ibiblio > > central > > Human Readable Name for this Mirror. > > http://mirrors.ibiblio.org/pub/mirrors/maven2/ > > > > > > > > > jboss-public-repository-group > > central > > JBoss Public Repository Group > > http://repository.jboss.org/nexus/content/groups/public > > > > > > > > CN > > OSChina Central > > http://maven.oschina.net/content/groups/public/ > > central > > > > > > net-cn > > central > > Human Readable Name for this Mirror. > > http://maven.net.cn/content/groups/public/ > > > > > > JBossJBPM > > central > > JBossJBPM Repository > > https://repository.jboss.org/nexus/content/repositories/releases/ > > > > > > > > > > > > > > > > > > > > 在 2021-01-13 15:11:16,"Carmen Free" 写道: > >建议你把Maven的settings.xml以及你项目的pom.xml也贴出来看看,大家才好帮你定位问题 > > > >penguin. 于2021年1月13日周三 下午2:25写道: > > > >> 贴不了图,我直接放文字吧 > >> > >> > >> > >> ▼θSync: at 2021/1/13 12:05 with 18 errors > >> > >>▼Resolve dependencies 4 errors > >> > >> Cannot resolve netminidev:json-smart:2.3 > >> > >> Cannot resolve io.confluent:kafka-schema-registry-client:4.1.0 > >> > >> Cannot resolve com.nimbusds:nimbus-jose-jwt:9.4.1 > >> Cannot resolve com.nimbusds:lang-tag:1.5 > >>▼Resolve plugins 14 errors > >> Cannot resolve plugin > org.codehaus.mojo:build-helper-maven-plugin: > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> 在 2021-01-13 13:42:27,"Carmen Free" 写道: > >> >hi, > >> > > >> >图看不见。 > >> > > >> >penguin. 于2021年1月13日周三 下午1:19写道: > >> > > >> >> 已经在maven的setting文件中配置了好几个镜像了,还是这样。如下图 > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >
Re: Re: idea导入flink源码问题(看不到图片,贴了下文字)
建议你把Maven的settings.xml以及你项目的pom.xml也贴出来看看,大家才好帮你定位问题 penguin. 于2021年1月13日周三 下午2:25写道: > 贴不了图,我直接放文字吧 > > > > ▼θSync: at 2021/1/13 12:05 with 18 errors > >▼Resolve dependencies 4 errors > > Cannot resolve netminidev:json-smart:2.3 > > Cannot resolve io.confluent:kafka-schema-registry-client:4.1.0 > > Cannot resolve com.nimbusds:nimbus-jose-jwt:9.4.1 > Cannot resolve com.nimbusds:lang-tag:1.5 >▼Resolve plugins 14 errors > Cannot resolve plugin org.codehaus.mojo:build-helper-maven-plugin: > > > > > > > > > > > > > > > > > > 在 2021-01-13 13:42:27,"Carmen Free" 写道: > >hi, > > > >图看不见。 > > > >penguin. 于2021年1月13日周三 下午1:19写道: > > > >> 已经在maven的setting文件中配置了好几个镜像了,还是这样。如下图 > >> > >> > >> > >> > >> > >> >
Re: rocksdb作为statebackend时,TM节点挂掉了,为何任务不能恢复呢?
Hi 唐老师, 谢谢解惑,明白了。 祝好 Yun Tang 于2021年1月13日周三 下午2:59写道: > Hi > > 其实问题本质很简单,作为一个分布式计算引擎,需要一个分布式文件系统才能保证可以做到分布式检查点的创建和恢复 > [1]。没有必要再尝试将checkpoint写到本地磁盘上,否则一旦发生failover是无法正常恢复的。 > > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/checkpointing.html#prerequisites > > 祝好 > 唐云 > ____ > From: Carmen Free > Sent: Wednesday, January 13, 2021 11:28 > To: user-zh@flink.apache.org > Subject: Re: rocksdb作为statebackend时,TM节点挂掉了,为何任务不能恢复呢? > > Hi 唐老师, > > 我又重新尝试了新的场景。 > > 我将集群起来后,然后使A节点的TM失效( 此时集群中仅有A节点的JM, B节点的TM ),这个时候在flnk web > > ui界面提交新的任务,任务被调度到B节点TM,可以发现任务一直正常运行,但是trigger检查点快照时一直不成功。报错跟前文中描述恢复检查点时的错误很类似,只不过恢复检查点时,是找不到chk-xx文件,在这里是无法创建chk-xx文件。 > > 具体报错如下: > 2021-01-13 10:33:10,042 WARN > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received > late message for now expired checkpoint attempt 901 from task > 709c3296b490f2e9cb3179ac4d9bbadb of job 14cdc07090d9f22445ba61144f6edf4b at > a0c28dc25b207ca789592018235fda56 @ node204 (dataPort=34389). > 2021-01-13 10:33:12,836 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 902 @ 1610505192836 for job 14cdc07090d9f22445ba61144f6edf4b. > 2021-01-13 10:33:12,844 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline > checkpoint 902 by task a7f9e9a63f1857b02a9e52c9d6bfefc7 of job > 14cdc07090d9f22445ba61144f6edf4b at a0c28dc25b207ca789592018235fda56 @ > node204 (dataPort=34389). > 2021-01-13 10:33:12,844 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding > checkpoint 902 of job 14cdc07090d9f22445ba61144f6edf4b. > > java.lang.Exception: Could not materialize checkpoint 902 for operator > Source: Custom Source -> Flat Map -> Timestamps/Watermarks (1/2). > at > > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1221) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1163) > at > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > > Caused by: java.util.concurrent.ExecutionException: java.io.IOException: > Could not flush and close the file system output stream to null in order to > obtain the stream state handle > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461) > at > > org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1126) > ... 3 more > > Caused by: java.io.IOException: Could not flush and close the file system > output stream to null in order to obtain the stream state handle > at > > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:334) > at > > org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:179) > at > > org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108) > at > > org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458) > ... 5 more > > Caused by: java.io.IOException: Could not open output stream for state > backend > at > > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:367) > at > > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234) > at > > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:309) > ... 10 more > > Caused by: java.io.IOException: Mkdirs failed to create > file:/data/flink/checkpoints/14cdc07090d9f22445ba61144f6edf4b/chk-902 > at > > org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem.java:269) > at > > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.c
Re: idea导入flink源码问题
hi, 图看不见。 penguin. 于2021年1月13日周三 下午1:19写道: > 已经在maven的setting文件中配置了好几个镜像了,还是这样。如下图 > > > > > >
Re: rocksdb作为statebackend时,TM节点挂掉了,为何任务不能恢复呢?
Hi 唐老师, 我又重新尝试了新的场景。 我将集群起来后,然后使A节点的TM失效( 此时集群中仅有A节点的JM, B节点的TM ),这个时候在flnk web ui界面提交新的任务,任务被调度到B节点TM,可以发现任务一直正常运行,但是trigger检查点快照时一直不成功。报错跟前文中描述恢复检查点时的错误很类似,只不过恢复检查点时,是找不到chk-xx文件,在这里是无法创建chk-xx文件。 具体报错如下: 2021-01-13 10:33:10,042 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late message for now expired checkpoint attempt 901 from task 709c3296b490f2e9cb3179ac4d9bbadb of job 14cdc07090d9f22445ba61144f6edf4b at a0c28dc25b207ca789592018235fda56 @ node204 (dataPort=34389). 2021-01-13 10:33:12,836 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 902 @ 1610505192836 for job 14cdc07090d9f22445ba61144f6edf4b. 2021-01-13 10:33:12,844 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline checkpoint 902 by task a7f9e9a63f1857b02a9e52c9d6bfefc7 of job 14cdc07090d9f22445ba61144f6edf4b at a0c28dc25b207ca789592018235fda56 @ node204 (dataPort=34389). 2021-01-13 10:33:12,844 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 902 of job 14cdc07090d9f22445ba61144f6edf4b. java.lang.Exception: Could not materialize checkpoint 902 for operator Source: Custom Source -> Flat Map -> Timestamps/Watermarks (1/2). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1221) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1163) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could not flush and close the file system output stream to null in order to obtain the stream state handle at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1126) ... 3 more Caused by: java.io.IOException: Could not flush and close the file system output stream to null in order to obtain the stream state handle at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:334) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:179) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458) ... 5 more Caused by: java.io.IOException: Could not open output stream for state backend at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:367) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:309) ... 10 more Caused by: java.io.IOException: Mkdirs failed to create file:/data/flink/checkpoints/14cdc07090d9f22445ba61144f6edf4b/chk-902 at org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem.java:269) at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:126) at org.apache.flink.core.fs.EntropyInjector.createEntropyAware(EntropyInjector.java:61) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:356) ... 12 more 那么也就是说,如果JM节点不同时作为TM节点时,checkpoint根本无法触发,更无从谈及failover restore了。因此我们可以认为flink仅支持statebackend的checkpoint目录是DFS时,才可以对状态恢复有比较好的支持是吗?否则,就意味着,JM节点必须同时作为TM,并且每次任务要调度至该节点上运行,才可以触发checkpoint,最终也只能在该节点进行failover restore。感觉这样的话,就确实不太适合配置本地目录作为state.checkpoints.dir 了。 另外,您在回复中也提到,在使用本地目录作为 state.checkpoints.dir 时, “必须得让原task部署回原来的机器才能正常运行”,实际上意味着任务要被调度至JM所在节点才可以。然后,任务调度时,flink这边能人为指定任务的分配吗?我看前边的讨论,好像flink的任务调度基本是自动完成的,人为干预难度较大是吧? 祝好 Yun Tang 于2021年1月12日周二 下午7:11写道: > Hi, > > 从异常日志看,应该是因为你的state.checkpoints.dir 或者说 > statebackend的checkpoint目录配置成了本地目录,checkpoint保存到了本地机器上,所以在failover > restore的时候,必须得让原task部署回原来的机器才能正常运行。将state backend的checkpoint目录更换为一个DFS目录即可。 > > > 祝好 > 唐云 > ____
Re: rocksdb作为statebackend时,TM节点挂掉了,为何任务不能恢复呢?
你好,唐老师,谢谢解答。 不好意思,下面补充一下报错信息,刚才忘记说了。 主要报错信息如下,重新模拟了下: 2021-01-12 18:09:34,236 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Custom Source -> Flat Map -> Timestamps/Watermarks (2/2) (3b50a7ce56b408c2978260846b76a28a) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@4c107a3a. java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for StreamSource_cbc357ccb763df2852fee8c4fc7d55f2_(2/2) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:252) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:139) ... 9 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86) at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:565) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:243) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ... 11 more Caused by: java.io.FileNotFoundException: /data/flink/checkpoints/b261de0447d59bb2f1db9c084f5b1a0b/chk-5/4a160901-8ddd-468e-a82d-6efcb8a9dff9 (No such file or directory) at java.io.FileInputStream.open0(Native Method) at java.io.FileInputStream.open(FileInputStream.java:195) at java.io.FileInputStream.(FileInputStream.java:138) at org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50) at org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142) at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85) at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68) at org.apache.flink.runtime.state.OperatorStreamStateHandle.openInputStream(OperatorStreamStateHandle.java:66) at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:73) at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83) ... 15 more 这个文件夹在A节点(JM)上是有的,难道是访问权限问题吗?B节点无法访问A节点吗,有点奇怪啊,配置了ssh免密的啊,文件夹/data/flink/checkpoints访问权限也设置成了777 Yun Tang 于2021年1月12日周二 下午5:46写道: > Hi > > Flink的容错机制是可以保证TM lost时候会尝试重启作业,“为何任务不能恢复”是需要看完整异常栈的,简单描述是无法帮助排查问题的。 > > 祝好 > 唐云 > ________ > From: Carmen Free > Sent: Tuesday, January 12, 2021 15:52 > To: user-zh@flink.apache.org > Subject: rocksdb作为statebackend时,TM节点挂掉了,为何任务不能恢复呢? > > hi, > > rocksdb作为statebackend时,TM节点挂掉了,为何任务不能恢复呢? > > 1、环境说明 > > flink版本:1.10.2 > 操作系统:centos 7 > > 2、集群说明(当前模拟了2节点) > > 节点A | 节点B > 角色| JM、TM|TM > taskslot | 4 | 4 > > 3、statebackend配置 > > # rocksdb作为状态后备 > state.backend: rocksdb > > > # 存储快照的目录(暂时使用的本地目录) > state.checkpoints.dir: file:///data/flink/checkpoints > > > 4、启动任务后,任务自动分配在A节点的TM上,运行一段时间后,检查点快照正常。接着,仅停掉A节点TM(JM仍正常运行),任务被自动调度至B节点的TM上,但是此时任务一直重启,无法恢复,这是为什么呢? > > > 5、如果我启动A节点,任务依旧无法恢复(此时任务仍在B节点运行),直到我停掉B节点TM,此时任务调度至A节点,任务可以正常恢复。所以有点疑问,4中的场景为何不能恢复任务呢?为什么只有在A节点上才可以进行任务恢复呢?最初以为是访问路径的问题,但是仔细想了想,检查点相关的操作一直都是JM进行的,我觉得只要JM没有挂掉,应该就可以将任务进行恢复啊,是我的理解有偏差吗? >
rocksdb作为statebackend时,TM节点挂掉了,为何任务不能恢复呢?
hi, rocksdb作为statebackend时,TM节点挂掉了,为何任务不能恢复呢? 1、环境说明 flink版本:1.10.2 操作系统:centos 7 2、集群说明(当前模拟了2节点) 节点A | 节点B 角色| JM、TM|TM taskslot | 4 | 4 3、statebackend配置 # rocksdb作为状态后备 state.backend: rocksdb # 存储快照的目录(暂时使用的本地目录) state.checkpoints.dir: file:///data/flink/checkpoints 4、启动任务后,任务自动分配在A节点的TM上,运行一段时间后,检查点快照正常。接着,仅停掉A节点TM(JM仍正常运行),任务被自动调度至B节点的TM上,但是此时任务一直重启,无法恢复,这是为什么呢? 5、如果我启动A节点,任务依旧无法恢复(此时任务仍在B节点运行),直到我停掉B节点TM,此时任务调度至A节点,任务可以正常恢复。所以有点疑问,4中的场景为何不能恢复任务呢?为什么只有在A节点上才可以进行任务恢复呢?最初以为是访问路径的问题,但是仔细想了想,检查点相关的操作一直都是JM进行的,我觉得只要JM没有挂掉,应该就可以将任务进行恢复啊,是我的理解有偏差吗?
Re: flink sql 消费鉴权的kafka,是怎么消费的呢?在kafka未添加鉴权之前,flink SQL消费kafka是正常的
好的,非常感谢。 赵一旦 于2021年1月6日周三 下午1:08写道: > 这个的话去看看KafkaConnector相关的参数,比较新的版本支持配置解析错误忽略。 > > Carmen Free 于2021年1月6日周三 上午10:58写道: > > > 感谢帮忙解决问题,确实包的路径有问题,换成这个包就解决了这个问题。 > > > > 紧接着我这边出现了新的异常 > > > > > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException: > > No content to map due to end-of-input at [Source:UNKONWN; line: -1, > column: > > -1;] > > > > 这个问题的原因,主要是由于kafka消息为空导致的,只要kafka消息不为空,就可以正常消费。 > > > > 但是如果遇到了kafka消息为空的情况,这边不能处理吗? > > > > 赵一旦 于2021年1月5日周二 下午9:18写道: > > > > > 我感觉还是jar的问题。如下尝试下,我懒得去试了。 > > > 将 org.apache.kafka.common.security.plain.PlainLoginModule 替换为 > > > org.apache.flink.kafka.shaded.org.apache.kafka.common.securi > > > ty.plain.PlainLoginModule > > > > > > 因为你用的是sql-connector-kafka,这个包把kafka-clients的包shade了。 > > > > > > Carmen Free 于2021年1月5日周二 下午5:09写道: > > > > > > > flink sql 消费鉴权的kafka,是怎么消费的呢?在kafka未添加鉴权之前,flink SQL消费kafka是正常的 > > > > > > > > 1、版本说明 > > > > flink版本:1.10.2 > > > > kafka版本:1.1.0 > > > > > > > > 2、kafka鉴权说明 > > > > 仅使用了sasl鉴权方式 > > > > 在kafka客户端有配置 kafka_server-jass.conf、 > > > > server.properties、producer.properties、consumer.properties > > > > > > > > 3、主要配置参数 > > > > sasl.mechanism=PLAIN > > > > security.protocol=SASL_PLAINTEXT > > > > > sasl.jaas.config=org.apache.kafka.comon.security.plain.PlainLoginModule > > > > required username="xx" password="xx-secret"; > > > > 当前配置方式,通过java程序来消费鉴权后的kafka,可以正常消费。 > > > > > > > > 4、用于flink SQL连接的jar包 > > > > flink-sql-connector-kafka_2.11-1.10.2.jar > > > > flink-jdbc_2.11-1.10.2.jar > > > > flink-csv-1.10.2-sql-jar.jar > > > > > > > > > > > > 5、我的思路 > > > > 类比java程序中添加kafka认证的相关参数(3中的参数,添加完毕即可),我直接在flink SQL中建立kafka > > > > table时,就指定了相关的3中所说明的参数。但是用flink SQL客户端来实现消费kafka,则会报错。 > > > > > > > > 6、启动客户端 > > > > ./bin/sql-client.sh embedded -l sql_lib/ > > > > 其中sql_lib文件夹放的是4中提及的flink-connector相关的jar包 > > > > > > > > > > > > 7、建表语句: > > > > create table test_hello ( > > > > name string > > > > ) with ( > > > > ... > > > > ... > > > > 'connector.properties.sasl.mechanism' = 'PLAIN', > > > > 'connector.properties.security.protocol' = 'SASL_PLAINTEXT', > > > > 'connector.properties.sasl.jaas.config' = > > > > 'org.apache.kafka.comon.security.plain.PlainLoginModule required > > > > username="xx" password="xx-secret";', > > > > 'format.type' = 'csv' > > > > ); > > > > > > > > 建表没有问题,可以正常建表。 > > > > > > > > 查询表的时候,就会报错,select * from test_hello; > > > > 报错如下: > > > > could not execute sql statement. Reason: > > > > javax.security.auth.login.loginException: unable to find loginModule > > > class: > > > > org.apache.kafka.common.security.plain.PlainLoginModule > > > > 但是这个类我在flink-sql-connector-kafka_2.11-1.10.2.jar包中是可以发现的,所以不知道什么原因? > > > > > > > > kafka添加鉴权后,我使用flink SQL的方式不对吗?希望有知道能帮忙解答一下,谢谢。 > > > > > > > > > >
Re: flink sql 消费鉴权的kafka,是怎么消费的呢?在kafka未添加鉴权之前,flink SQL消费kafka是正常的
感谢帮忙解决问题,确实包的路径有问题,换成这个包就解决了这个问题。 紧接着我这边出现了新的异常 org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException: No content to map due to end-of-input at [Source:UNKONWN; line: -1, column: -1;] 这个问题的原因,主要是由于kafka消息为空导致的,只要kafka消息不为空,就可以正常消费。 但是如果遇到了kafka消息为空的情况,这边不能处理吗? 赵一旦 于2021年1月5日周二 下午9:18写道: > 我感觉还是jar的问题。如下尝试下,我懒得去试了。 > 将 org.apache.kafka.common.security.plain.PlainLoginModule 替换为 > org.apache.flink.kafka.shaded.org.apache.kafka.common.securi > ty.plain.PlainLoginModule > > 因为你用的是sql-connector-kafka,这个包把kafka-clients的包shade了。 > > Carmen Free 于2021年1月5日周二 下午5:09写道: > > > flink sql 消费鉴权的kafka,是怎么消费的呢?在kafka未添加鉴权之前,flink SQL消费kafka是正常的 > > > > 1、版本说明 > > flink版本:1.10.2 > > kafka版本:1.1.0 > > > > 2、kafka鉴权说明 > > 仅使用了sasl鉴权方式 > > 在kafka客户端有配置 kafka_server-jass.conf、 > > server.properties、producer.properties、consumer.properties > > > > 3、主要配置参数 > > sasl.mechanism=PLAIN > > security.protocol=SASL_PLAINTEXT > > sasl.jaas.config=org.apache.kafka.comon.security.plain.PlainLoginModule > > required username="xx" password="xx-secret"; > > 当前配置方式,通过java程序来消费鉴权后的kafka,可以正常消费。 > > > > 4、用于flink SQL连接的jar包 > > flink-sql-connector-kafka_2.11-1.10.2.jar > > flink-jdbc_2.11-1.10.2.jar > > flink-csv-1.10.2-sql-jar.jar > > > > > > 5、我的思路 > > 类比java程序中添加kafka认证的相关参数(3中的参数,添加完毕即可),我直接在flink SQL中建立kafka > > table时,就指定了相关的3中所说明的参数。但是用flink SQL客户端来实现消费kafka,则会报错。 > > > > 6、启动客户端 > > ./bin/sql-client.sh embedded -l sql_lib/ > > 其中sql_lib文件夹放的是4中提及的flink-connector相关的jar包 > > > > > > 7、建表语句: > > create table test_hello ( > > name string > > ) with ( > > ... > > ... > > 'connector.properties.sasl.mechanism' = 'PLAIN', > > 'connector.properties.security.protocol' = 'SASL_PLAINTEXT', > > 'connector.properties.sasl.jaas.config' = > > 'org.apache.kafka.comon.security.plain.PlainLoginModule required > > username="xx" password="xx-secret";', > > 'format.type' = 'csv' > > ); > > > > 建表没有问题,可以正常建表。 > > > > 查询表的时候,就会报错,select * from test_hello; > > 报错如下: > > could not execute sql statement. Reason: > > javax.security.auth.login.loginException: unable to find loginModule > class: > > org.apache.kafka.common.security.plain.PlainLoginModule > > 但是这个类我在flink-sql-connector-kafka_2.11-1.10.2.jar包中是可以发现的,所以不知道什么原因? > > > > kafka添加鉴权后,我使用flink SQL的方式不对吗?希望有知道能帮忙解答一下,谢谢。 > > >
flink sql 消费鉴权的kafka,是怎么消费的呢?在kafka未添加鉴权之前,flink SQL消费kafka是正常的
flink sql 消费鉴权的kafka,是怎么消费的呢?在kafka未添加鉴权之前,flink SQL消费kafka是正常的 1、版本说明 flink版本:1.10.2 kafka版本:1.1.0 2、kafka鉴权说明 仅使用了sasl鉴权方式 在kafka客户端有配置 kafka_server-jass.conf、 server.properties、producer.properties、consumer.properties 3、主要配置参数 sasl.mechanism=PLAIN security.protocol=SASL_PLAINTEXT sasl.jaas.config=org.apache.kafka.comon.security.plain.PlainLoginModule required username="xx" password="xx-secret"; 当前配置方式,通过java程序来消费鉴权后的kafka,可以正常消费。 4、用于flink SQL连接的jar包 flink-sql-connector-kafka_2.11-1.10.2.jar flink-jdbc_2.11-1.10.2.jar flink-csv-1.10.2-sql-jar.jar 5、我的思路 类比java程序中添加kafka认证的相关参数(3中的参数,添加完毕即可),我直接在flink SQL中建立kafka table时,就指定了相关的3中所说明的参数。但是用flink SQL客户端来实现消费kafka,则会报错。 6、启动客户端 ./bin/sql-client.sh embedded -l sql_lib/ 其中sql_lib文件夹放的是4中提及的flink-connector相关的jar包 7、建表语句: create table test_hello ( name string ) with ( ... ... 'connector.properties.sasl.mechanism' = 'PLAIN', 'connector.properties.security.protocol' = 'SASL_PLAINTEXT', 'connector.properties.sasl.jaas.config' = 'org.apache.kafka.comon.security.plain.PlainLoginModule required username="xx" password="xx-secret";', 'format.type' = 'csv' ); 建表没有问题,可以正常建表。 查询表的时候,就会报错,select * from test_hello; 报错如下: could not execute sql statement. Reason: javax.security.auth.login.loginException: unable to find loginModule class: org.apache.kafka.common.security.plain.PlainLoginModule 但是这个类我在flink-sql-connector-kafka_2.11-1.10.2.jar包中是可以发现的,所以不知道什么原因? kafka添加鉴权后,我使用flink SQL的方式不对吗?希望有知道能帮忙解答一下,谢谢。