Robert, But if Kafka is really only available in the user jar, then this error > still should not occur.
I think so too; it should not occur. I scan through all the jar files in the classpath using `jar tf` but no jar contains org.apache.kafka.common.serialization.Deserializer with a different version. In your case it seems that the classes are loaded from different > classloaders. Hmm, why did the artifact work fine with per-job cluster mode? p.s. Another user seems to face the same problem: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Duplication-error-on-Kafka-Connector-Libraries-td39805.html#a39812 Thanks, Dongwon On Wed, Dec 16, 2020 at 4:45 PM Robert Metzger <[email protected]> wrote: > Hey Dongwon, > > I don't think this is the intended behavior. > I believe in application mode, we are adding the user jar into the system > classloader as well. In your case it seems that the classes are loaded from > different classloaders. > But if Kafka is really only available in the user jar, then this error > still should not occur. > > > On Wed, Dec 16, 2020 at 8:22 AM Dongwon Kim <[email protected]> wrote: > >> I just added the following option to the script: >> >> >> -Dclassloader.parent-first-patterns.additional=org.apache.kafka.common.serialization >> >> Now it seems to work. >> >> Why do the application mode and the per-job cluster mode behave >> differently when it comes to the classloading? >> >> Is it a bug? or intended? >> >> Best, >> >> Dongwon >> >> On Wed, Dec 16, 2020 at 3:59 PM Dongwon Kim <[email protected]> >> wrote: >> >>> Hi, >>> >>> I have an artifact which works perfectly fine with Per-Job Cluster Mode >>> with the following bash script: >>> >>> #!/bin/env bash >>> >>> export FLINK_CONF_DIR=./conf >>> >>> export HADOOP_CLASSPATH=`hadoop classpath` >>> >>> >>> $FLINK_HOME/bin/flink run -t yarn-per-job myjar.jar myconf.conf >>> >>> I tried Application Mode [1] using the exact same artifact with the >>> following script: >>> >>> #!/bin/env bash >>> >>> >>> export FLINK_CONF_DIR=./conf >>> >>> export HADOOP_CLASSPATH=`hadoop classpath` >>> >>> >>> $FLINK_HOME/bin/flink run-application -t yarn-application \ >>> >>> >>> -Dyarn.provided.lib.dirs='hdfs:///flink-dists/flink-1.12.0/lib;hdfs:///flink-dists/flink-1.12.0/plugins' >>> \ >>> >>> -Dyarn.ship-files=myconf.conf \ >>> >>> hdfs:///jars/myjar.jar myconf.conf >>> >>> but the job fails with the following exception >>> >>> 2020-12-16 15:52:25,364 WARN org.apache.flink.runtime.taskmanager.Task >>> [] - session-window -> (Sink: kafka-sink, Sink: >>> session-window-late-data) (1/1)#0 (ee9fc1aa21833c749e3c271fd52cbfd4) >>> switched from RUNNING to FAILED. >>> >>> org.apache.kafka.common.KafkaException: Failed to construct kafka >>> producer >>> >>> at >>> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432) >>> ~[stream-calculator-0.1-SNAPSHOT.jar:?] >>> >>> at >>> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298) >>> ~[stream-calculator-0.1-SNAPSHOT.jar:?] >>> >>> at >>> org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:78) >>> ~[stream-calculator-0.1-SNAPSHOT.jar:?] >>> >>> at >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1158) >>> ~[stream-calculator-0.1-SNAPSHOT.jar:?] >>> >>> at >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1259) >>> ~[stream-calculator-0.1-SNAPSHOT.jar:?] >>> >>> at >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1255) >>> ~[stream-calculator-0.1-SNAPSHOT.jar:?] >>> >>> at >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:950) >>> ~[stream-calculator-0.1-SNAPSHOT.jar:?] >>> >>> at >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:100) >>> ~[stream-calculator-0.1-SNAPSHOT.jar:?] >>> >>> at >>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:398) >>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>> >>> at >>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:389) >>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>> >>> at >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1128) >>> ~[stream-calculator-0.1-SNAPSHOT.jar:?] >>> >>> at >>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185) >>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>> >>> at >>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167) >>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>> >>> at >>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) >>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>> >>> at >>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:107) >>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>> >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:264) >>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400) >>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507) >>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) >>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501) >>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531) >>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>> >>> at >>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) >>> [flink-dist_2.11-1.12.0.jar:1.12.0] >>> >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) >>> [flink-dist_2.11-1.12.0.jar:1.12.0] >>> >>> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222] >>> >>> Caused by: org.apache.kafka.common.KafkaException: class >>> org.apache.kafka.common.serialization.ByteArraySerializer is not an >>> instance of org.apache.kafka.common.serialization.Serializer >>> >>> at >>> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374) >>> ~[stream-calculator-0.1-SNAPSHOT.jar:?] >>> >>> at >>> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392) >>> ~[stream-calculator-0.1-SNAPSHOT.jar:?] >>> >>> at >>> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:359) >>> ~[stream-calculator-0.1-SNAPSHOT.jar:?] >>> >>> ... 23 more >>> >>> I have flink-connector-kafka_2.11 in my artifact and don't have it under >>> flink lib directory at all. >>> >>> Thanks in advance, >>> >>> p.s. the attached is the detailed log message from a TM >>> >>> Dongwon >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/yarn.html#application-mode >>> >>> >>
