[ 
https://issues.apache.org/jira/browse/FLINK-24591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Krzysztof Dziolak updated FLINK-24591:
--------------------------------------
    Description: 
We have identified an issue when using Kafka Producer with Flink 1.13 under `
{code:java}
java.lang.SecurityException: java.lang.SecurityException: setContextClassLoader
        at 
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
 Method)
        at 
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at 
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at 
java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
        at 
java.base/java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:600)
        at 
java.base/java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:678)
        at 
java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:737)
        at 
java.base/java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
        at 
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173)
        at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
        at 
java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
        at 
java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:661)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.abortTransactions(FlinkKafkaProducer.java:1263)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1189)
        at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
        at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
        at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
        at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:764)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:571)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.SecurityException: setContextClassLoader
        at 
java.base/java.util.concurrent.ForkJoinWorkerThread$InnocuousForkJoinWorkerThread.setContextClassLoader(ForkJoinWorkerThread.java:240)
        at 
org.apache.flink.util.TemporaryClassLoaderContext.of(TemporaryClassLoaderContext.java:61)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.lambda$abortTransactions$3(FlinkKafkaProducer.java:1270)
        at 
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
        at 
java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1603)
        at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
        at 
java.base/java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
        at 
java.base/java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:746)
        at 
java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
        at 
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
        at 
java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
        at 
java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
        at 
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
{code}
Look in the attachments for a reproducer.

Same data serialized to a file works fine (look the filesystem example in the 
reproducer)

  was:
A user reported in the [mailing 
list|https://lists.apache.org/thread.html/re38a07f6121cc580737a20c11574719cfe554e58d99817f79db9bb4a%40%3Cuser.flink.apache.org%3E]
 that Avro deserialization fails when using Kafka, Avro and Confluent Schema 
Registry:  

{code:java}
Caused by: java.io.IOException: Failed to deserialize Avro record.
  at 
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
  at 
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
  
  at 
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
  at 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
  at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
 
  at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
  at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
  at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
  at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
  at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
Caused by: org.apache.avro.AvroTypeException: Found my.type.avro.MyEnumType, 
expecting union
  at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
  at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
  at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
  at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
  at 
org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298)
  at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
  at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
  at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
  at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
  at 
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
  at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
  at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
  at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
  at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
  at 
org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:81)
  at 
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
  ... 9 more
{code}

Look in the attachments for a reproducer.

Same data serialized to a file works fine (look the filesystem example in the 
reproducer) 



> Kafka Producer fails with SecurityException when using 
> cluster.intercept-user-system-exit
> -----------------------------------------------------------------------------------------
>
>                 Key: FLINK-24591
>                 URL: https://issues.apache.org/jira/browse/FLINK-24591
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.13.2
>            Reporter: Krzysztof Dziolak
>            Priority: Major
>
> We have identified an issue when using Kafka Producer with Flink 1.13 under `
> {code:java}
> java.lang.SecurityException: java.lang.SecurityException: 
> setContextClassLoader
>       at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>  Method)
>       at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>       at 
> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>       at 
> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
>       at 
> java.base/java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:600)
>       at 
> java.base/java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:678)
>       at 
> java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:737)
>       at 
> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
>       at 
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173)
>       at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
>       at 
> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
>       at 
> java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:661)
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.abortTransactions(FlinkKafkaProducer.java:1263)
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1189)
>       at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
>       at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
>       at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>       at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:764)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:571)
>       at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: java.lang.SecurityException: setContextClassLoader
>       at 
> java.base/java.util.concurrent.ForkJoinWorkerThread$InnocuousForkJoinWorkerThread.setContextClassLoader(ForkJoinWorkerThread.java:240)
>       at 
> org.apache.flink.util.TemporaryClassLoaderContext.of(TemporaryClassLoaderContext.java:61)
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.lambda$abortTransactions$3(FlinkKafkaProducer.java:1270)
>       at 
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>       at 
> java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1603)
>       at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
>       at 
> java.base/java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
>       at 
> java.base/java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:746)
>       at 
> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
>       at 
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
>       at 
> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
>       at 
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
>       at 
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
> {code}
> Look in the attachments for a reproducer.
> Same data serialized to a file works fine (look the filesystem example in the 
> reproducer)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to