Re: failed w/ Application Mode but succeeded w/ Per-Job Cluster Mode

2020-12-15 Thread Dongwon Kim
I just added the following option to the script:

-Dclassloader.parent-first-patterns.additional=org.apache.kafka.common.serialization

Now it seems to work.

Why do the application mode and the per-job cluster mode behave differently
when it comes to the classloading?

Is it a bug? or intended?

Best,

Dongwon

On Wed, Dec 16, 2020 at 3:59 PM Dongwon Kim  wrote:

> Hi,
>
> I have an artifact which works perfectly fine with Per-Job Cluster Mode
> with the following bash script:
>
> #!/bin/env bash
>
> export FLINK_CONF_DIR=./conf
>
> export HADOOP_CLASSPATH=`hadoop classpath`
>
>
> $FLINK_HOME/bin/flink run -t yarn-per-job myjar.jar myconf.conf
>
> I tried Application Mode [1] using the exact same artifact with the
> following script:
>
> #!/bin/env bash
>
>
> export FLINK_CONF_DIR=./conf
>
> export HADOOP_CLASSPATH=`hadoop classpath`
>
>
> $FLINK_HOME/bin/flink run-application -t yarn-application \
>
> 
> -Dyarn.provided.lib.dirs='hdfs:///flink-dists/flink-1.12.0/lib;hdfs:///flink-dists/flink-1.12.0/plugins'
> \
>
> -Dyarn.ship-files=myconf.conf \
>
> hdfs:///jars/myjar.jar myconf.conf
>
> but the job fails with the following exception
>
> 2020-12-16 15:52:25,364 WARN  org.apache.flink.runtime.taskmanager.Task
>   [] - session-window -> (Sink: kafka-sink, Sink:
> session-window-late-data) (1/1)#0 (ee9fc1aa21833c749e3c271fd52cbfd4)
> switched from RUNNING to FAILED.
>
> org.apache.kafka.common.KafkaException: Failed to construct kafka producer
>
> at
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:432)
> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>
> at
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:298)
> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>
> at
> org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.(FlinkKafkaInternalProducer.java:78)
> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1158)
> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1259)
> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1255)
> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:950)
> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:100)
> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>
> at
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:398)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
> at
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:389)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1128)
> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
> at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:107)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:264)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
> at org.apache.flin

Re: failed w/ Application Mode but succeeded w/ Per-Job Cluster Mode

2020-12-16 Thread Dongwon Kim
Robert,

But if Kafka is really only available in the user jar, then this error
> still should not occur.

I think so too; it should not occur.
I scan through all the jar files in the classpath using `jar tf` but no jar
contains org.apache.kafka.common.serialization.Deserializer with a
different version.

In your case it seems that the classes are loaded from different
> classloaders.

Hmm, why did the artifact work fine with per-job cluster mode?

p.s. Another user seems to face the same problem:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Duplication-error-on-Kafka-Connector-Libraries-td39805.html#a39812

Thanks,

Dongwon



On Wed, Dec 16, 2020 at 4:45 PM Robert Metzger  wrote:

> Hey Dongwon,
>
> I don't think this is the intended behavior.
> I believe in application mode, we are adding the user jar into the system
> classloader as well. In your case it seems that the classes are loaded from
> different classloaders.
> But if Kafka is really only available in the user jar, then this error
> still should not occur.
>
>
> On Wed, Dec 16, 2020 at 8:22 AM Dongwon Kim  wrote:
>
>> I just added the following option to the script:
>>
>>
>> -Dclassloader.parent-first-patterns.additional=org.apache.kafka.common.serialization
>>
>> Now it seems to work.
>>
>> Why do the application mode and the per-job cluster mode behave
>> differently when it comes to the classloading?
>>
>> Is it a bug? or intended?
>>
>> Best,
>>
>> Dongwon
>>
>> On Wed, Dec 16, 2020 at 3:59 PM Dongwon Kim 
>> wrote:
>>
>>> Hi,
>>>
>>> I have an artifact which works perfectly fine with Per-Job Cluster Mode
>>> with the following bash script:
>>>
>>> #!/bin/env bash
>>>
>>> export FLINK_CONF_DIR=./conf
>>>
>>> export HADOOP_CLASSPATH=`hadoop classpath`
>>>
>>>
>>> $FLINK_HOME/bin/flink run -t yarn-per-job myjar.jar myconf.conf
>>>
>>> I tried Application Mode [1] using the exact same artifact with the
>>> following script:
>>>
>>> #!/bin/env bash
>>>
>>>
>>> export FLINK_CONF_DIR=./conf
>>>
>>> export HADOOP_CLASSPATH=`hadoop classpath`
>>>
>>>
>>> $FLINK_HOME/bin/flink run-application -t yarn-application \
>>>
>>> 
>>> -Dyarn.provided.lib.dirs='hdfs:///flink-dists/flink-1.12.0/lib;hdfs:///flink-dists/flink-1.12.0/plugins'
>>> \
>>>
>>> -Dyarn.ship-files=myconf.conf \
>>>
>>> hdfs:///jars/myjar.jar myconf.conf
>>>
>>> but the job fails with the following exception
>>>
>>> 2020-12-16 15:52:25,364 WARN  org.apache.flink.runtime.taskmanager.Task
>>>   [] - session-window -> (Sink: kafka-sink, Sink:
>>> session-window-late-data) (1/1)#0 (ee9fc1aa21833c749e3c271fd52cbfd4)
>>> switched from RUNNING to FAILED.
>>>
>>> org.apache.kafka.common.KafkaException: Failed to construct kafka
>>> producer
>>>
>>> at
>>> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:432)
>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>
>>> at
>>> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:298)
>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>
>>> at
>>> org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.(FlinkKafkaInternalProducer.java:78)
>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1158)
>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1259)
>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1255)
>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:950)
>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:100)
>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>
>>> at
>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:398)
>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>
>>> at
>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:389)
>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1128)
>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>
>>> at
>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>
>>> at
>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
>>> 

Re: failed w/ Application Mode but succeeded w/ Per-Job Cluster Mode

2020-12-16 Thread Yang Wang
Hi Dongwon,

For application mode, the job submission happens in the JobManager side. We
are using an embedded client
to submit the job. So the user jar will be added to distributed cache. When
deploying a task to TaskManager,
it will be downloaded again and run in user classloader even though we
already have it in the system classpath.

I think it might be the reason why these classes are loaded by different
classloaders.

For per-job mode, we are recovering the job and the user jars will not be
added to distributed cache.

Could you add "-Dyarn.per-job-cluster.include-user-jar=DISABLED" to your
command and have a try? After that, we
will disable the user jars including in the system classpath.


Best,
Yang



Dongwon Kim  于2020年12月16日周三 下午4:20写道:

> Robert,
>
> But if Kafka is really only available in the user jar, then this error
>> still should not occur.
>
> I think so too; it should not occur.
> I scan through all the jar files in the classpath using `jar tf` but no
> jar contains org.apache.kafka.common.serialization.Deserializer with a
> different version.
>
> In your case it seems that the classes are loaded from different
>> classloaders.
>
> Hmm, why did the artifact work fine with per-job cluster mode?
>
> p.s. Another user seems to face the same problem:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Duplication-error-on-Kafka-Connector-Libraries-td39805.html#a39812
>
> Thanks,
>
> Dongwon
>
>
>
> On Wed, Dec 16, 2020 at 4:45 PM Robert Metzger 
> wrote:
>
>> Hey Dongwon,
>>
>> I don't think this is the intended behavior.
>> I believe in application mode, we are adding the user jar into the system
>> classloader as well. In your case it seems that the classes are loaded from
>> different classloaders.
>> But if Kafka is really only available in the user jar, then this error
>> still should not occur.
>>
>>
>> On Wed, Dec 16, 2020 at 8:22 AM Dongwon Kim 
>> wrote:
>>
>>> I just added the following option to the script:
>>>
>>>
>>> -Dclassloader.parent-first-patterns.additional=org.apache.kafka.common.serialization
>>>
>>> Now it seems to work.
>>>
>>> Why do the application mode and the per-job cluster mode behave
>>> differently when it comes to the classloading?
>>>
>>> Is it a bug? or intended?
>>>
>>> Best,
>>>
>>> Dongwon
>>>
>>> On Wed, Dec 16, 2020 at 3:59 PM Dongwon Kim 
>>> wrote:
>>>
 Hi,

 I have an artifact which works perfectly fine with Per-Job Cluster Mode
 with the following bash script:

 #!/bin/env bash

 export FLINK_CONF_DIR=./conf

 export HADOOP_CLASSPATH=`hadoop classpath`


 $FLINK_HOME/bin/flink run -t yarn-per-job myjar.jar myconf.conf

 I tried Application Mode [1] using the exact same artifact with the
 following script:

 #!/bin/env bash


 export FLINK_CONF_DIR=./conf

 export HADOOP_CLASSPATH=`hadoop classpath`


 $FLINK_HOME/bin/flink run-application -t yarn-application \

 
 -Dyarn.provided.lib.dirs='hdfs:///flink-dists/flink-1.12.0/lib;hdfs:///flink-dists/flink-1.12.0/plugins'
 \

 -Dyarn.ship-files=myconf.conf \

 hdfs:///jars/myjar.jar myconf.conf

 but the job fails with the following exception

 2020-12-16 15:52:25,364 WARN  org.apache.flink.runtime.taskmanager.Task
   [] - session-window -> (Sink: kafka-sink, Sink:
 session-window-late-data) (1/1)#0 (ee9fc1aa21833c749e3c271fd52cbfd4)
 switched from RUNNING to FAILED.

 org.apache.kafka.common.KafkaException: Failed to construct kafka
 producer

 at
 org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:432)
 ~[stream-calculator-0.1-SNAPSHOT.jar:?]

 at
 org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:298)
 ~[stream-calculator-0.1-SNAPSHOT.jar:?]

 at
 org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.(FlinkKafkaInternalProducer.java:78)
 ~[stream-calculator-0.1-SNAPSHOT.jar:?]

 at
 org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1158)
 ~[stream-calculator-0.1-SNAPSHOT.jar:?]

 at
 org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1259)
 ~[stream-calculator-0.1-SNAPSHOT.jar:?]

 at
 org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1255)
 ~[stream-calculator-0.1-SNAPSHOT.jar:?]

 at
 org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:950)
 ~[stream-calculator-0.1-SNAPSHOT.jar:?]

 at
 org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:100)
 ~[stream-calculator-0

Re: failed w/ Application Mode but succeeded w/ Per-Job Cluster Mode

2020-12-16 Thread Dongwon Kim
Hi Yang,

Thanks for the detailed explanation!

Could you add "-Dyarn.per-job-cluster.include-user-jar=DISABLED" to your
> command and have a try? After that, we
> will disable the user jars including in the system classpath.


I tried the following as you suggested:

#!/bin/env bash


export FLINK_CONF_DIR=./conf

export HADOOP_CLASSPATH=`hadoop classpath`


$FLINK_HOME/bin/flink run-application -t yarn-application \


  
-Dyarn.provided.lib.dirs='hdfs:///flink-dists/flink-1.12.0/lib;hdfs:///flink-dists/flink-1.12.0/plugins'
\

-Dyarn.ship-files=myconf.conf \

-Dyarn.per-job-cluster.include-user-jar=DISABLED \

hdfs:///jars/myjar.jar myconf.conf

Unfortunately, this attempt fails with the following exception on TMs:

> 2020-12-16 18:29:37,859 WARN  org.apache.flink.runtime.taskmanager.Task
>  [] - enricher (1/1)#0 (add478e602e93e1720a3d92ebbab5cc6)
> switched from RUNNING to FAILED.
> java.lang.ClassCastException: io.netty.channel.epoll.EpollEventLoopGroup
> cannot be cast to io.netty.channel.EventLoopGroup
> at
> io.lettuce.core.resource.DefaultEventLoopGroupProvider.getOrCreate(DefaultEventLoopGroupProvider.java:151)
> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
> at
> io.lettuce.core.resource.DefaultEventLoopGroupProvider.allocate(DefaultEventLoopGroupProvider.java:89)
> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
> at
> io.lettuce.core.AbstractRedisClient.doGetEventExecutor(AbstractRedisClient.java:275)
> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
> at
> io.lettuce.core.AbstractRedisClient.getEventLoopGroup(AbstractRedisClient.java:264)
> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
> at
> io.lettuce.core.AbstractRedisClient.channelType(AbstractRedisClient.java:246)
> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
> at io.lettuce.core.RedisClient.connectStatefulAsync(RedisClient.java:315)
> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
> at
> io.lettuce.core.RedisClient.connectStandaloneAsync(RedisClient.java:278)
> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
> at io.lettuce.core.RedisClient.connect(RedisClient.java:211)
> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
> at io.lettuce.core.RedisClient.connect(RedisClient.java:196)
> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
> at com.kakaomobility.drivinghabit.stream.Enricher.open(Enricher.java:55)
> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.open(AsyncWaitOperator.java:154)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:401)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> [flink-dist_2.11-1.12.0.jar:1.12.0]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> [flink-dist_2.11-1.12.0.jar:1.12.0]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]
> Suppressed: java.lang.ClassCastException:
> io.netty.channel.epoll.EpollEventLoopGroup cannot be cast to
> io.netty.util.concurrent.EventExecutorGroup
> at
> io.lettuce.core.resource.DefaultEventLoopGroupProvider.shutdown(DefaultEventLoopGroupProvider.java:292)
> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
> at
> io.lettuce.core.resource.DefaultClientResources.shutdown(DefaultClientResources.java:648)
> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
> at
> io.lettuce.core.AbstractRedisClient.closeClientResources(AbstractRedisClient.java:569)
> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
> at
> io.lettuce.core.AbstractRedisClient.lambda$shutdownAsync$5(AbstractRedis

Re: failed w/ Application Mode but succeeded w/ Per-Job Cluster Mode

2020-12-16 Thread Yang Wang
I am not sure about the root cause, but it seems that you could force the
default NIO-based transport to work around[1].
Add -Denv.java.opts="-Dcom.datastax.driver.FORCE_NIO=true" to your
submission commands.

[1].
https://stackoverflow.com/questions/48762857/java-lang-classcastexception-netty-fail-on-jar-execution-on-flink

Best,
Yang

Dongwon Kim  于2020年12月16日周三 下午5:37写道:

> Hi Yang,
>
> Thanks for the detailed explanation!
>
> Could you add "-Dyarn.per-job-cluster.include-user-jar=DISABLED" to your
>> command and have a try? After that, we
>> will disable the user jars including in the system classpath.
>
>
> I tried the following as you suggested:
>
> #!/bin/env bash
>
>
> export FLINK_CONF_DIR=./conf
>
> export HADOOP_CLASSPATH=`hadoop classpath`
>
>
> $FLINK_HOME/bin/flink run-application -t yarn-application \
>
>
>   
> -Dyarn.provided.lib.dirs='hdfs:///flink-dists/flink-1.12.0/lib;hdfs:///flink-dists/flink-1.12.0/plugins'
> \
>
> -Dyarn.ship-files=myconf.conf \
>
> -Dyarn.per-job-cluster.include-user-jar=DISABLED \
>
> hdfs:///jars/myjar.jar myconf.conf
>
> Unfortunately, this attempt fails with the following exception on TMs:
>
>> 2020-12-16 18:29:37,859 WARN  org.apache.flink.runtime.taskmanager.Task
>>  [] - enricher (1/1)#0 (add478e602e93e1720a3d92ebbab5cc6)
>> switched from RUNNING to FAILED.
>> java.lang.ClassCastException: io.netty.channel.epoll.EpollEventLoopGroup
>> cannot be cast to io.netty.channel.EventLoopGroup
>> at
>> io.lettuce.core.resource.DefaultEventLoopGroupProvider.getOrCreate(DefaultEventLoopGroupProvider.java:151)
>> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
>> at
>> io.lettuce.core.resource.DefaultEventLoopGroupProvider.allocate(DefaultEventLoopGroupProvider.java:89)
>> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
>> at
>> io.lettuce.core.AbstractRedisClient.doGetEventExecutor(AbstractRedisClient.java:275)
>> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
>> at
>> io.lettuce.core.AbstractRedisClient.getEventLoopGroup(AbstractRedisClient.java:264)
>> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
>> at
>> io.lettuce.core.AbstractRedisClient.channelType(AbstractRedisClient.java:246)
>> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
>> at io.lettuce.core.RedisClient.connectStatefulAsync(RedisClient.java:315)
>> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
>> at
>> io.lettuce.core.RedisClient.connectStandaloneAsync(RedisClient.java:278)
>> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
>> at io.lettuce.core.RedisClient.connect(RedisClient.java:211)
>> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
>> at io.lettuce.core.RedisClient.connect(RedisClient.java:196)
>> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
>> at com.kakaomobility.drivinghabit.stream.Enricher.open(Enricher.java:55)
>> ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
>> at
>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.open(AsyncWaitOperator.java:154)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:401)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>> [flink-dist_2.11-1.12.0.jar:1.12.0]
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>> [flink-dist_2.11-1.12.0.jar:1.12.0]
>> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]
>> Suppressed: java.lang.ClassCastException:
>> io.netty.channel.epoll.EpollEventLoopGroup cannot be cast to
>> io.netty.util.concurrent.EventExecutorGroup
>> at
>> io.lettuce.core.resource.DefaultEventLoopGroupProvider.shutdown(DefaultEventLoopGroupProvider.java:292)
>> ~[blob_p-ad05abf47895f79601474d976f6b3a