I just added the following option to the script:

-Dclassloader.parent-first-patterns.additional=org.apache.kafka.common.serialization

Now it seems to work.

Why do the application mode and the per-job cluster mode behave differently
when it comes to the classloading?

Is it a bug? or intended?

Best,

Dongwon

On Wed, Dec 16, 2020 at 3:59 PM Dongwon Kim <eastcirc...@gmail.com> wrote:

> Hi,
>
> I have an artifact which works perfectly fine with Per-Job Cluster Mode
> with the following bash script:
>
> #!/bin/env bash
>
> export FLINK_CONF_DIR=./conf
>
> export HADOOP_CLASSPATH=`hadoop classpath`
>
>
> $FLINK_HOME/bin/flink run -t yarn-per-job myjar.jar myconf.conf
>
> I tried Application Mode [1] using the exact same artifact with the
> following script:
>
> #!/bin/env bash
>
>
> export FLINK_CONF_DIR=./conf
>
> export HADOOP_CLASSPATH=`hadoop classpath`
>
>
> $FLINK_HOME/bin/flink run-application -t yarn-application \
>
>     
> -Dyarn.provided.lib.dirs='hdfs:///flink-dists/flink-1.12.0/lib;hdfs:///flink-dists/flink-1.12.0/plugins'
> \
>
>     -Dyarn.ship-files=myconf.conf \
>
>     hdfs:///jars/myjar.jar myconf.conf
>
> but the job fails with the following exception
>
> 2020-12-16 15:52:25,364 WARN  org.apache.flink.runtime.taskmanager.Task
>                   [] - session-window -> (Sink: kafka-sink, Sink:
> session-window-late-data) (1/1)#0 (ee9fc1aa21833c749e3c271fd52cbfd4)
> switched from RUNNING to FAILED.
>
> org.apache.kafka.common.KafkaException: Failed to construct kafka producer
>
>         at
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)
> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>
>         at
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>
>         at
> org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:78)
> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>
>         at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1158)
> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>
>         at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1259)
> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>
>         at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1255)
> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>
>         at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:950)
> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>
>         at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:100)
> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>
>         at
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:398)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
>         at
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:389)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
>         at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1128)
> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>
>         at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
>         at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
>         at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
>         at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:107)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
>         at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:264)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> [flink-dist_2.11-1.12.0.jar:1.12.0]
>
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> [flink-dist_2.11-1.12.0.jar:1.12.0]
>
>         at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]
>
> Caused by: org.apache.kafka.common.KafkaException: class
> org.apache.kafka.common.serialization.ByteArraySerializer is not an
> instance of org.apache.kafka.common.serialization.Serializer
>
>         at
> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374)
> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>
>         at
> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392)
> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>
>         at
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:359)
> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>
>         ... 23 more
>
> I have flink-connector-kafka_2.11 in my artifact and don't have it under
> flink lib directory at all.
>
> Thanks in advance,
>
> p.s. the attached is the detailed log message from a TM
>
> Dongwon
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/yarn.html#application-mode
>
>

Reply via email to