I am not sure about the root cause, but it seems that you could force the default NIO-based transport to work around[1]. Add -Denv.java.opts="-Dcom.datastax.driver.FORCE_NIO=true" to your submission commands.
[1]. https://stackoverflow.com/questions/48762857/java-lang-classcastexception-netty-fail-on-jar-execution-on-flink Best, Yang Dongwon Kim <eastcirc...@gmail.com> 于2020年12月16日周三 下午5:37写道: > Hi Yang, > > Thanks for the detailed explanation! > > Could you add "-Dyarn.per-job-cluster.include-user-jar=DISABLED" to your >> command and have a try? After that, we >> will disable the user jars including in the system classpath. > > > I tried the following as you suggested: > > #!/bin/env bash > > > export FLINK_CONF_DIR=./conf > > export HADOOP_CLASSPATH=`hadoop classpath` > > > $FLINK_HOME/bin/flink run-application -t yarn-application \ > > > > -Dyarn.provided.lib.dirs='hdfs:///flink-dists/flink-1.12.0/lib;hdfs:///flink-dists/flink-1.12.0/plugins' > \ > > -Dyarn.ship-files=myconf.conf \ > > -Dyarn.per-job-cluster.include-user-jar=DISABLED \ > > hdfs:///jars/myjar.jar myconf.conf > > Unfortunately, this attempt fails with the following exception on TMs: > >> 2020-12-16 18:29:37,859 WARN org.apache.flink.runtime.taskmanager.Task >> [] - enricher (1/1)#0 (add478e602e93e1720a3d92ebbab5cc6) >> switched from RUNNING to FAILED. >> java.lang.ClassCastException: io.netty.channel.epoll.EpollEventLoopGroup >> cannot be cast to io.netty.channel.EventLoopGroup >> at >> io.lettuce.core.resource.DefaultEventLoopGroupProvider.getOrCreate(DefaultEventLoopGroupProvider.java:151) >> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?] >> at >> io.lettuce.core.resource.DefaultEventLoopGroupProvider.allocate(DefaultEventLoopGroupProvider.java:89) >> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?] >> at >> io.lettuce.core.AbstractRedisClient.doGetEventExecutor(AbstractRedisClient.java:275) >> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?] >> at >> io.lettuce.core.AbstractRedisClient.getEventLoopGroup(AbstractRedisClient.java:264) >> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?] >> at >> io.lettuce.core.AbstractRedisClient.channelType(AbstractRedisClient.java:246) >> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?] >> at io.lettuce.core.RedisClient.connectStatefulAsync(RedisClient.java:315) >> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?] >> at >> io.lettuce.core.RedisClient.connectStandaloneAsync(RedisClient.java:278) >> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?] >> at io.lettuce.core.RedisClient.connect(RedisClient.java:211) >> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?] >> at io.lettuce.core.RedisClient.connect(RedisClient.java:196) >> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?] >> at com.kakaomobility.drivinghabit.stream.Enricher.open(Enricher.java:55) >> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?] >> at >> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) >> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >> at >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) >> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >> at >> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.open(AsyncWaitOperator.java:154) >> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:401) >> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507) >> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >> at >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) >> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501) >> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531) >> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) >> [flink-dist_2.11-1.12.0.jar:1.12.0] >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) >> [flink-dist_2.11-1.12.0.jar:1.12.0] >> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222] >> Suppressed: java.lang.ClassCastException: >> io.netty.channel.epoll.EpollEventLoopGroup cannot be cast to >> io.netty.util.concurrent.EventExecutorGroup >> at >> io.lettuce.core.resource.DefaultEventLoopGroupProvider.shutdown(DefaultEventLoopGroupProvider.java:292) >> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?] >> at >> io.lettuce.core.resource.DefaultClientResources.shutdown(DefaultClientResources.java:648) >> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?] >> at >> io.lettuce.core.AbstractRedisClient.closeClientResources(AbstractRedisClient.java:569) >> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?] >> at >> io.lettuce.core.AbstractRedisClient.lambda$shutdownAsync$5(AbstractRedisClient.java:521) >> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?] >> at >> java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:981) >> ~[?:1.8.0_222] >> at >> java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2124) >> ~[?:1.8.0_222] >> at >> io.lettuce.core.AbstractRedisClient.shutdownAsync(AbstractRedisClient.java:521) >> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?] >> at >> io.lettuce.core.AbstractRedisClient.shutdown(AbstractRedisClient.java:485) >> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?] >> at >> io.lettuce.core.AbstractRedisClient.shutdown(AbstractRedisClient.java:453) >> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?] >> at com.kakaomobility.drivinghabit.stream.Enricher.close(Enricher.java:60) >> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?] >> at >> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) >> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >> at >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) >> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:740) >> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:720) >> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:643) >> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:552) >> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) >> [flink-dist_2.11-1.12.0.jar:1.12.0] >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) >> [flink-dist_2.11-1.12.0.jar:1.12.0] >> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222] >> > > The exception seems to come from another operator, not Kafka, and this > operator performs async io using Lettuce, an async Redis client API. > > Best, > > Dongwon > > On Wed, Dec 16, 2020 at 6:07 PM Yang Wang <danrtsey...@gmail.com> wrote: > >> Hi Dongwon, >> >> For application mode, the job submission happens in the JobManager side. >> We are using an embedded client >> to submit the job. So the user jar will be added to distributed cache. >> When deploying a task to TaskManager, >> it will be downloaded again and run in user classloader even though we >> already have it in the system classpath. >> >> I think it might be the reason why these classes are loaded by different >> classloaders. >> >> For per-job mode, we are recovering the job and the user jars will not be >> added to distributed cache. >> >> Could you add "-Dyarn.per-job-cluster.include-user-jar=DISABLED" to your >> command and have a try? After that, we >> will disable the user jars including in the system classpath. >> >> >> Best, >> Yang >> >> >> >> Dongwon Kim <eastcirc...@gmail.com> 于2020年12月16日周三 下午4:20写道: >> >>> Robert, >>> >>> But if Kafka is really only available in the user jar, then this error >>>> still should not occur. >>> >>> I think so too; it should not occur. >>> I scan through all the jar files in the classpath using `jar tf` but no >>> jar contains org.apache.kafka.common.serialization.Deserializer with a >>> different version. >>> >>> In your case it seems that the classes are loaded from different >>>> classloaders. >>> >>> Hmm, why did the artifact work fine with per-job cluster mode? >>> >>> p.s. Another user seems to face the same problem: >>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Duplication-error-on-Kafka-Connector-Libraries-td39805.html#a39812 >>> >>> Thanks, >>> >>> Dongwon >>> >>> >>> >>> On Wed, Dec 16, 2020 at 4:45 PM Robert Metzger <rmetz...@apache.org> >>> wrote: >>> >>>> Hey Dongwon, >>>> >>>> I don't think this is the intended behavior. >>>> I believe in application mode, we are adding the user jar into the >>>> system classloader as well. In your case it seems that the classes are >>>> loaded from different classloaders. >>>> But if Kafka is really only available in the user jar, then this error >>>> still should not occur. >>>> >>>> >>>> On Wed, Dec 16, 2020 at 8:22 AM Dongwon Kim <eastcirc...@gmail.com> >>>> wrote: >>>> >>>>> I just added the following option to the script: >>>>> >>>>> >>>>> -Dclassloader.parent-first-patterns.additional=org.apache.kafka.common.serialization >>>>> >>>>> Now it seems to work. >>>>> >>>>> Why do the application mode and the per-job cluster mode behave >>>>> differently when it comes to the classloading? >>>>> >>>>> Is it a bug? or intended? >>>>> >>>>> Best, >>>>> >>>>> Dongwon >>>>> >>>>> On Wed, Dec 16, 2020 at 3:59 PM Dongwon Kim <eastcirc...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> I have an artifact which works perfectly fine with Per-Job Cluster >>>>>> Mode with the following bash script: >>>>>> >>>>>> #!/bin/env bash >>>>>> >>>>>> export FLINK_CONF_DIR=./conf >>>>>> >>>>>> export HADOOP_CLASSPATH=`hadoop classpath` >>>>>> >>>>>> >>>>>> $FLINK_HOME/bin/flink run -t yarn-per-job myjar.jar myconf.conf >>>>>> >>>>>> I tried Application Mode [1] using the exact same artifact with the >>>>>> following script: >>>>>> >>>>>> #!/bin/env bash >>>>>> >>>>>> >>>>>> export FLINK_CONF_DIR=./conf >>>>>> >>>>>> export HADOOP_CLASSPATH=`hadoop classpath` >>>>>> >>>>>> >>>>>> $FLINK_HOME/bin/flink run-application -t yarn-application \ >>>>>> >>>>>> >>>>>> -Dyarn.provided.lib.dirs='hdfs:///flink-dists/flink-1.12.0/lib;hdfs:///flink-dists/flink-1.12.0/plugins' >>>>>> \ >>>>>> >>>>>> -Dyarn.ship-files=myconf.conf \ >>>>>> >>>>>> hdfs:///jars/myjar.jar myconf.conf >>>>>> >>>>>> but the job fails with the following exception >>>>>> >>>>>> 2020-12-16 15:52:25,364 WARN >>>>>> org.apache.flink.runtime.taskmanager.Task [] - >>>>>> session-window -> (Sink: kafka-sink, Sink: session-window-late-data) >>>>>> (1/1)#0 (ee9fc1aa21833c749e3c271fd52cbfd4) switched from RUNNING to >>>>>> FAILED. >>>>>> >>>>>> org.apache.kafka.common.KafkaException: Failed to construct kafka >>>>>> producer >>>>>> >>>>>> at >>>>>> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432) >>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?] >>>>>> >>>>>> at >>>>>> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298) >>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?] >>>>>> >>>>>> at >>>>>> org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:78) >>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?] >>>>>> >>>>>> at >>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1158) >>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?] >>>>>> >>>>>> at >>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1259) >>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?] >>>>>> >>>>>> at >>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1255) >>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?] >>>>>> >>>>>> at >>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:950) >>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?] >>>>>> >>>>>> at >>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:100) >>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?] >>>>>> >>>>>> at >>>>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:398) >>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>> >>>>>> at >>>>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:389) >>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>> >>>>>> at >>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1128) >>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?] >>>>>> >>>>>> at >>>>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185) >>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>> >>>>>> at >>>>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167) >>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>> >>>>>> at >>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) >>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>> >>>>>> at >>>>>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:107) >>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>> >>>>>> at >>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:264) >>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>> >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400) >>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>> >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507) >>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>> >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) >>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>> >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501) >>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>> >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531) >>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>> >>>>>> at >>>>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) >>>>>> [flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>> >>>>>> at >>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) >>>>>> [flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>> >>>>>> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222] >>>>>> >>>>>> Caused by: org.apache.kafka.common.KafkaException: class >>>>>> org.apache.kafka.common.serialization.ByteArraySerializer is not an >>>>>> instance of org.apache.kafka.common.serialization.Serializer >>>>>> >>>>>> at >>>>>> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374) >>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?] >>>>>> >>>>>> at >>>>>> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392) >>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?] >>>>>> >>>>>> at >>>>>> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:359) >>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?] >>>>>> >>>>>> ... 23 more >>>>>> >>>>>> I have flink-connector-kafka_2.11 in my artifact and don't have it >>>>>> under flink lib directory at all. >>>>>> >>>>>> Thanks in advance, >>>>>> >>>>>> p.s. the attached is the detailed log message from a TM >>>>>> >>>>>> Dongwon >>>>>> >>>>>> [1] >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/yarn.html#application-mode >>>>>> >>>>>> >>>>>