I just added the following option to the script: -Dclassloader.parent-first-patterns.additional=org.apache.kafka.common.serialization
Now it seems to work. Why do the application mode and the per-job cluster mode behave differently when it comes to the classloading? Is it a bug? or intended? Best, Dongwon On Wed, Dec 16, 2020 at 3:59 PM Dongwon Kim <eastcirc...@gmail.com> wrote: > Hi, > > I have an artifact which works perfectly fine with Per-Job Cluster Mode > with the following bash script: > > #!/bin/env bash > > export FLINK_CONF_DIR=./conf > > export HADOOP_CLASSPATH=`hadoop classpath` > > > $FLINK_HOME/bin/flink run -t yarn-per-job myjar.jar myconf.conf > > I tried Application Mode [1] using the exact same artifact with the > following script: > > #!/bin/env bash > > > export FLINK_CONF_DIR=./conf > > export HADOOP_CLASSPATH=`hadoop classpath` > > > $FLINK_HOME/bin/flink run-application -t yarn-application \ > > > -Dyarn.provided.lib.dirs='hdfs:///flink-dists/flink-1.12.0/lib;hdfs:///flink-dists/flink-1.12.0/plugins' > \ > > -Dyarn.ship-files=myconf.conf \ > > hdfs:///jars/myjar.jar myconf.conf > > but the job fails with the following exception > > 2020-12-16 15:52:25,364 WARN org.apache.flink.runtime.taskmanager.Task > [] - session-window -> (Sink: kafka-sink, Sink: > session-window-late-data) (1/1)#0 (ee9fc1aa21833c749e3c271fd52cbfd4) > switched from RUNNING to FAILED. > > org.apache.kafka.common.KafkaException: Failed to construct kafka producer > > at > org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432) > ~[stream-calculator-0.1-SNAPSHOT.jar:?] > > at > org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298) > ~[stream-calculator-0.1-SNAPSHOT.jar:?] > > at > org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:78) > ~[stream-calculator-0.1-SNAPSHOT.jar:?] > > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1158) > ~[stream-calculator-0.1-SNAPSHOT.jar:?] > > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1259) > ~[stream-calculator-0.1-SNAPSHOT.jar:?] > > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1255) > ~[stream-calculator-0.1-SNAPSHOT.jar:?] > > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:950) > ~[stream-calculator-0.1-SNAPSHOT.jar:?] > > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:100) > ~[stream-calculator-0.1-SNAPSHOT.jar:?] > > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:398) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:389) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1128) > ~[stream-calculator-0.1-SNAPSHOT.jar:?] > > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:107) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:264) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) > [flink-dist_2.11-1.12.0.jar:1.12.0] > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) > [flink-dist_2.11-1.12.0.jar:1.12.0] > > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222] > > Caused by: org.apache.kafka.common.KafkaException: class > org.apache.kafka.common.serialization.ByteArraySerializer is not an > instance of org.apache.kafka.common.serialization.Serializer > > at > org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374) > ~[stream-calculator-0.1-SNAPSHOT.jar:?] > > at > org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392) > ~[stream-calculator-0.1-SNAPSHOT.jar:?] > > at > org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:359) > ~[stream-calculator-0.1-SNAPSHOT.jar:?] > > ... 23 more > > I have flink-connector-kafka_2.11 in my artifact and don't have it under > flink lib directory at all. > > Thanks in advance, > > p.s. the attached is the detailed log message from a TM > > Dongwon > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/yarn.html#application-mode > >