I am not sure about the root cause, but it seems that you could force the
default NIO-based transport to work around[1].
Add -Denv.java.opts="-Dcom.datastax.driver.FORCE_NIO=true" to your
submission commands.

[1].
https://stackoverflow.com/questions/48762857/java-lang-classcastexception-netty-fail-on-jar-execution-on-flink

Best,
Yang

Dongwon Kim <eastcirc...@gmail.com> 于2020年12月16日周三 下午5:37写道:

> Hi Yang,
>
> Thanks for the detailed explanation!
>
> Could you add "-Dyarn.per-job-cluster.include-user-jar=DISABLED" to your
>> command and have a try? After that, we
>> will disable the user jars including in the system classpath.
>
>
> I tried the following as you suggested:
>
> #!/bin/env bash
>
>
> export FLINK_CONF_DIR=./conf
>
> export HADOOP_CLASSPATH=`hadoop classpath`
>
>
> $FLINK_HOME/bin/flink run-application -t yarn-application \
>
>
>   
> -Dyarn.provided.lib.dirs='hdfs:///flink-dists/flink-1.12.0/lib;hdfs:///flink-dists/flink-1.12.0/plugins'
> \
>
>     -Dyarn.ship-files=myconf.conf \
>
>     -Dyarn.per-job-cluster.include-user-jar=DISABLED \
>
>     hdfs:///jars/myjar.jar myconf.conf
>
> Unfortunately, this attempt fails with the following exception on TMs:
>
>> 2020-12-16 18:29:37,859 WARN  org.apache.flink.runtime.taskmanager.Task
>>                  [] - enricher (1/1)#0 (add478e602e93e1720a3d92ebbab5cc6)
>> switched from RUNNING to FAILED.
>> java.lang.ClassCastException: io.netty.channel.epoll.EpollEventLoopGroup
>> cannot be cast to io.netty.channel.EventLoopGroup
>> at
>> io.lettuce.core.resource.DefaultEventLoopGroupProvider.getOrCreate(DefaultEventLoopGroupProvider.java:151)
>> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
>> at
>> io.lettuce.core.resource.DefaultEventLoopGroupProvider.allocate(DefaultEventLoopGroupProvider.java:89)
>> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
>> at
>> io.lettuce.core.AbstractRedisClient.doGetEventExecutor(AbstractRedisClient.java:275)
>> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
>> at
>> io.lettuce.core.AbstractRedisClient.getEventLoopGroup(AbstractRedisClient.java:264)
>> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
>> at
>> io.lettuce.core.AbstractRedisClient.channelType(AbstractRedisClient.java:246)
>> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
>> at io.lettuce.core.RedisClient.connectStatefulAsync(RedisClient.java:315)
>> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
>> at
>> io.lettuce.core.RedisClient.connectStandaloneAsync(RedisClient.java:278)
>> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
>> at io.lettuce.core.RedisClient.connect(RedisClient.java:211)
>> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
>> at io.lettuce.core.RedisClient.connect(RedisClient.java:196)
>> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
>> at com.kakaomobility.drivinghabit.stream.Enricher.open(Enricher.java:55)
>> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
>> at
>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.open(AsyncWaitOperator.java:154)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:401)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>> [flink-dist_2.11-1.12.0.jar:1.12.0]
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>> [flink-dist_2.11-1.12.0.jar:1.12.0]
>> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]
>> Suppressed: java.lang.ClassCastException:
>> io.netty.channel.epoll.EpollEventLoopGroup cannot be cast to
>> io.netty.util.concurrent.EventExecutorGroup
>> at
>> io.lettuce.core.resource.DefaultEventLoopGroupProvider.shutdown(DefaultEventLoopGroupProvider.java:292)
>> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
>> at
>> io.lettuce.core.resource.DefaultClientResources.shutdown(DefaultClientResources.java:648)
>> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
>> at
>> io.lettuce.core.AbstractRedisClient.closeClientResources(AbstractRedisClient.java:569)
>> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
>> at
>> io.lettuce.core.AbstractRedisClient.lambda$shutdownAsync$5(AbstractRedisClient.java:521)
>> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
>> at
>> java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:981)
>> ~[?:1.8.0_222]
>> at
>> java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2124)
>> ~[?:1.8.0_222]
>> at
>> io.lettuce.core.AbstractRedisClient.shutdownAsync(AbstractRedisClient.java:521)
>> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
>> at
>> io.lettuce.core.AbstractRedisClient.shutdown(AbstractRedisClient.java:485)
>> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
>> at
>> io.lettuce.core.AbstractRedisClient.shutdown(AbstractRedisClient.java:453)
>> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
>> at com.kakaomobility.drivinghabit.stream.Enricher.close(Enricher.java:60)
>> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
>> at
>> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:740)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:720)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:643)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:552)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>> [flink-dist_2.11-1.12.0.jar:1.12.0]
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>> [flink-dist_2.11-1.12.0.jar:1.12.0]
>> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]
>>
>
> The exception seems to come from another operator, not Kafka, and this
> operator performs async io using Lettuce, an async Redis client API.
>
> Best,
>
> Dongwon
>
> On Wed, Dec 16, 2020 at 6:07 PM Yang Wang <danrtsey...@gmail.com> wrote:
>
>> Hi Dongwon,
>>
>> For application mode, the job submission happens in the JobManager side.
>> We are using an embedded client
>> to submit the job. So the user jar will be added to distributed cache.
>> When deploying a task to TaskManager,
>> it will be downloaded again and run in user classloader even though we
>> already have it in the system classpath.
>>
>> I think it might be the reason why these classes are loaded by different
>> classloaders.
>>
>> For per-job mode, we are recovering the job and the user jars will not be
>> added to distributed cache.
>>
>> Could you add "-Dyarn.per-job-cluster.include-user-jar=DISABLED" to your
>> command and have a try? After that, we
>> will disable the user jars including in the system classpath.
>>
>>
>> Best,
>> Yang
>>
>>
>>
>> Dongwon Kim <eastcirc...@gmail.com> 于2020年12月16日周三 下午4:20写道:
>>
>>> Robert,
>>>
>>> But if Kafka is really only available in the user jar, then this error
>>>> still should not occur.
>>>
>>> I think so too; it should not occur.
>>> I scan through all the jar files in the classpath using `jar tf` but no
>>> jar contains org.apache.kafka.common.serialization.Deserializer with a
>>> different version.
>>>
>>> In your case it seems that the classes are loaded from different
>>>> classloaders.
>>>
>>> Hmm, why did the artifact work fine with per-job cluster mode?
>>>
>>> p.s. Another user seems to face the same problem:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Duplication-error-on-Kafka-Connector-Libraries-td39805.html#a39812
>>>
>>> Thanks,
>>>
>>> Dongwon
>>>
>>>
>>>
>>> On Wed, Dec 16, 2020 at 4:45 PM Robert Metzger <rmetz...@apache.org>
>>> wrote:
>>>
>>>> Hey Dongwon,
>>>>
>>>> I don't think this is the intended behavior.
>>>> I believe in application mode, we are adding the user jar into the
>>>> system classloader as well. In your case it seems that the classes are
>>>> loaded from different classloaders.
>>>> But if Kafka is really only available in the user jar, then this error
>>>> still should not occur.
>>>>
>>>>
>>>> On Wed, Dec 16, 2020 at 8:22 AM Dongwon Kim <eastcirc...@gmail.com>
>>>> wrote:
>>>>
>>>>> I just added the following option to the script:
>>>>>
>>>>>
>>>>> -Dclassloader.parent-first-patterns.additional=org.apache.kafka.common.serialization
>>>>>
>>>>> Now it seems to work.
>>>>>
>>>>> Why do the application mode and the per-job cluster mode behave
>>>>> differently when it comes to the classloading?
>>>>>
>>>>> Is it a bug? or intended?
>>>>>
>>>>> Best,
>>>>>
>>>>> Dongwon
>>>>>
>>>>> On Wed, Dec 16, 2020 at 3:59 PM Dongwon Kim <eastcirc...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I have an artifact which works perfectly fine with Per-Job Cluster
>>>>>> Mode with the following bash script:
>>>>>>
>>>>>> #!/bin/env bash
>>>>>>
>>>>>> export FLINK_CONF_DIR=./conf
>>>>>>
>>>>>> export HADOOP_CLASSPATH=`hadoop classpath`
>>>>>>
>>>>>>
>>>>>> $FLINK_HOME/bin/flink run -t yarn-per-job myjar.jar myconf.conf
>>>>>>
>>>>>> I tried Application Mode [1] using the exact same artifact with the
>>>>>> following script:
>>>>>>
>>>>>> #!/bin/env bash
>>>>>>
>>>>>>
>>>>>> export FLINK_CONF_DIR=./conf
>>>>>>
>>>>>> export HADOOP_CLASSPATH=`hadoop classpath`
>>>>>>
>>>>>>
>>>>>> $FLINK_HOME/bin/flink run-application -t yarn-application \
>>>>>>
>>>>>>     
>>>>>> -Dyarn.provided.lib.dirs='hdfs:///flink-dists/flink-1.12.0/lib;hdfs:///flink-dists/flink-1.12.0/plugins'
>>>>>> \
>>>>>>
>>>>>>     -Dyarn.ship-files=myconf.conf \
>>>>>>
>>>>>>     hdfs:///jars/myjar.jar myconf.conf
>>>>>>
>>>>>> but the job fails with the following exception
>>>>>>
>>>>>> 2020-12-16 15:52:25,364 WARN
>>>>>> org.apache.flink.runtime.taskmanager.Task                    [] -
>>>>>> session-window -> (Sink: kafka-sink, Sink: session-window-late-data)
>>>>>> (1/1)#0 (ee9fc1aa21833c749e3c271fd52cbfd4) switched from RUNNING to
>>>>>> FAILED.
>>>>>>
>>>>>> org.apache.kafka.common.KafkaException: Failed to construct kafka
>>>>>> producer
>>>>>>
>>>>>>         at
>>>>>> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)
>>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>>>
>>>>>>         at
>>>>>> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
>>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>>>
>>>>>>         at
>>>>>> org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:78)
>>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>>>
>>>>>>         at
>>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1158)
>>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>>>
>>>>>>         at
>>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1259)
>>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>>>
>>>>>>         at
>>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1255)
>>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>>>
>>>>>>         at
>>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:950)
>>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>>>
>>>>>>         at
>>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:100)
>>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>>>
>>>>>>         at
>>>>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:398)
>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>
>>>>>>         at
>>>>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:389)
>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>
>>>>>>         at
>>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1128)
>>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>>>
>>>>>>         at
>>>>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>
>>>>>>         at
>>>>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>
>>>>>>         at
>>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>
>>>>>>         at
>>>>>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:107)
>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>
>>>>>>         at
>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:264)
>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>
>>>>>>         at
>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400)
>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>
>>>>>>         at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>
>>>>>>         at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>
>>>>>>         at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>
>>>>>>         at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>
>>>>>>         at
>>>>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>>>>>> [flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>
>>>>>>         at
>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>>>>>> [flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>
>>>>>>         at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]
>>>>>>
>>>>>> Caused by: org.apache.kafka.common.KafkaException: class
>>>>>> org.apache.kafka.common.serialization.ByteArraySerializer is not an
>>>>>> instance of org.apache.kafka.common.serialization.Serializer
>>>>>>
>>>>>>         at
>>>>>> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374)
>>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>>>
>>>>>>         at
>>>>>> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392)
>>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>>>
>>>>>>         at
>>>>>> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:359)
>>>>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>>>>
>>>>>>         ... 23 more
>>>>>>
>>>>>> I have flink-connector-kafka_2.11 in my artifact and don't have it
>>>>>> under flink lib directory at all.
>>>>>>
>>>>>> Thanks in advance,
>>>>>>
>>>>>> p.s. the attached is the detailed log message from a TM
>>>>>>
>>>>>> Dongwon
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/yarn.html#application-mode
>>>>>>
>>>>>>
>>>>>

Reply via email to