[ https://issues.apache.org/jira/browse/FLINK-24591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Krzysztof Dziolak updated FLINK-24591: -------------------------------------- Description: We have identified an issue when using Kafka Producer with Flink 1.13 under ` {code:java} java.lang.SecurityException: java.lang.SecurityException: setContextClassLoader at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490) at java.base/java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:600) at java.base/java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:678) at java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:737) at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159) at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497) at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:661) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.abortTransactions(FlinkKafkaProducer.java:1263) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1189) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:764) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:571) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.SecurityException: setContextClassLoader at java.base/java.util.concurrent.ForkJoinWorkerThread$InnocuousForkJoinWorkerThread.setContextClassLoader(ForkJoinWorkerThread.java:240) at org.apache.flink.util.TemporaryClassLoaderContext.of(TemporaryClassLoaderContext.java:61) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.lambda$abortTransactions$3(FlinkKafkaProducer.java:1270) at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) at java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1603) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) at java.base/java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290) at java.base/java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:746) at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) {code} Look in the attachments for a reproducer. Same data serialized to a file works fine (look the filesystem example in the reproducer) was: A user reported in the [mailing list|https://lists.apache.org/thread.html/re38a07f6121cc580737a20c11574719cfe554e58d99817f79db9bb4a%40%3Cuser.flink.apache.org%3E] that Avro deserialization fails when using Kafka, Avro and Confluent Schema Registry: {code:java} Caused by: java.io.IOException: Failed to deserialize Avro record. at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106) at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46) at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113) at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179) at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269) Caused by: org.apache.avro.AvroTypeException: Found my.type.avro.MyEnumType, expecting union at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308) at org.apache.avro.io.parsing.Parser.advance(Parser.java:86) at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187) at org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) at org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:81) at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103) ... 9 more {code} Look in the attachments for a reproducer. Same data serialized to a file works fine (look the filesystem example in the reproducer) > Kafka Producer fails with SecurityException when using > cluster.intercept-user-system-exit > ----------------------------------------------------------------------------------------- > > Key: FLINK-24591 > URL: https://issues.apache.org/jira/browse/FLINK-24591 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.13.2 > Reporter: Krzysztof Dziolak > Priority: Major > > We have identified an issue when using Kafka Producer with Flink 1.13 under ` > {code:java} > java.lang.SecurityException: java.lang.SecurityException: > setContextClassLoader > at > java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native > Method) > at > java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at > java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490) > at > java.base/java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:600) > at > java.base/java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:678) > at > java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:737) > at > java.base/java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159) > at > java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173) > at > java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) > at > java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497) > at > java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:661) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.abortTransactions(FlinkKafkaProducer.java:1263) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1189) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:764) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:571) > at java.base/java.lang.Thread.run(Thread.java:829) > Caused by: java.lang.SecurityException: setContextClassLoader > at > java.base/java.util.concurrent.ForkJoinWorkerThread$InnocuousForkJoinWorkerThread.setContextClassLoader(ForkJoinWorkerThread.java:240) > at > org.apache.flink.util.TemporaryClassLoaderContext.of(TemporaryClassLoaderContext.java:61) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.lambda$abortTransactions$3(FlinkKafkaProducer.java:1270) > at > java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) > at > java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1603) > at > java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) > at > java.base/java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290) > at > java.base/java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:746) > at > java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) > at > java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) > at > java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) > at > java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) > at > java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) > {code} > Look in the attachments for a reproducer. > Same data serialized to a file works fine (look the filesystem example in the > reproducer) -- This message was sent by Atlassian Jira (v8.3.4#803005)