I'm using Flink 1.10 and Kafka (AWS MSK) 2.2 and trying to do a simple app
that consumes from one kafka topic and produces events into another topic.
I would like to utilize the exactly_once semantic, however, I am
experiencing the following error:
org.apache.kafka.common.KafkaException:
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at
java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:593)
at
java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)
at
java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
at
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at
java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
at
java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:650)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.abortTransactions(FlinkKafkaProducer.java:1099)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1036)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: Failed to construct
kafka producer
at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)
at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
at
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:76)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.lambda$abortTransactions$2(FlinkKafkaProducer.java:1107)
at
java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka.
common.serialization.ByteArraySerializer is not an instance of
org.apache.kafka.common.serialization.Serializer
at
org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374)
at
org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392)
at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:359)
... 12 more
My producer is defined as
new FlinkKafkaProducer[String](
appArgs.authTxnTopic, // target topic
new KeyedSerializationSchemaWrapper[String](
new SimpleStringSchema()
), // serialization schema
kafkaProdProps, // producer config,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE
)
if I remove the exactly_once semantic as below, it works.
new FlinkKafkaProducer[String](
appArgs.authTxnTopic, // target topic
new KeyedSerializationSchemaWrapper[String](
new SimpleStringSchema()
), // serialization schema
kafkaProdProps // producer config
)
I don't understand what I'm incorrectly, as this should work based on the
docs. I see a similar experience noted here:
https://stackoverflow.com/questions/62466188/flink-kafka-exactly-once-causing-kafkaexception-bytearrayserializer-is-not-an-in
without
any answer as well.
Appreciate the help.