[ 
https://issues.apache.org/jira/browse/KAFKA-13195?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13195:
------------------------------------
    Issue Type: Improvement  (was: Bug)

> StateSerde don't honor DeserializationExceptionHandler
> ------------------------------------------------------
>
>                 Key: KAFKA-13195
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13195
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 2.8.0
>            Reporter: Ludo
>            Priority: Major
>
> Kafka streams allow to configure an 
> [DeserializationExceptionHandler|https://docs.confluent.io/platform/current/streams/faq.html#failure-and-exception-handling]
>  
> When you are using a StateStore most of message will be a copy of original 
> message in internal topic and mostly will use the same serializer if the 
> message is another type. 
> You can see 
> [here|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java#L159-L161]
>  that StateSerde is using the raw Deserializer and not honor the 
> {{StreamsConfig.}}{{DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG}}.
> Leading to crash the application (reaching the 
> {{setUncaughtExceptionHandler}} method).
> I think the state store must have the same behavior than the 
> {{RecordDeserializer}} and honor the DeserializationExceptionHandler.
>  
> Stacktrace (coming from kafka stream 2.6.1) :
>  
> {code:java}
> Uncaught exception in Kafka Stream kestra_executor-StreamThread-1, closing 
> !Uncaught exception in Kafka Stream kestra_executor-StreamThread-1, closing 
> !org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_14, processor=workertaskjoined-repartition-source, 
> topic=kestra_executor-workertaskjoined-repartition, partition=14, 
> offset=167500, 
> stacktrace=org.apache.kafka.common.errors.SerializationException: 
> com.fasterxml.jackson.databind.exc.InvalidFormatException: Cannot deserialize 
> value of type `io.kestra.plugin.gcp.bigquery.ExtractToGcs$Format` from String 
> "txt": not one of the values accepted for Enum class: 
> [NEWLINE_DELIMITED_JSON, AVRO, PARQUET, CSV] at [Source: 
> (byte[])"{....[truncated 1270 bytes]; line: 1, column: 1187] (through 
> reference chain: 
> io.kestra.core.models.flows.Flow["tasks"]->java.util.ArrayList[1]->org.kestra.task.gcp.bigquery.ExtractToGcs["format"])
>  at 
> com.fasterxml.jackson.databind.exc.InvalidFormatException.from(InvalidFormatException.java:67)
>  at 
> com.fasterxml.jackson.databind.DeserializationContext.weirdStringException(DeserializationContext.java:1851)
>  at 
> com.fasterxml.jackson.databind.DeserializationContext.handleWeirdStringValue(DeserializationContext.java:1079)
>  at 
> com.fasterxml.jackson.databind.deser.std.EnumDeserializer._deserializeAltString(EnumDeserializer.java:327)
>  at 
> com.fasterxml.jackson.databind.deser.std.EnumDeserializer._fromString(EnumDeserializer.java:214)
>  at 
> com.fasterxml.jackson.databind.deser.std.EnumDeserializer.deserialize(EnumDeserializer.java:188)
>  at 
> com.fasterxml.jackson.databind.deser.impl.FieldProperty.deserializeAndSet(FieldProperty.java:138)
>  at 
> com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:324)
>  at 
> com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:225)
>  at 
> com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:197)
>  at 
> com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer._deserializeTypedForId(AsPropertyTypeDeserializer.java:137)
>  at 
> com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer.deserializeTypedFromObject(AsPropertyTypeDeserializer.java:107)
>  at 
> com.fasterxml.jackson.databind.deser.AbstractDeserializer.deserializeWithType(AbstractDeserializer.java:263)
>  at 
> com.fasterxml.jackson.databind.deser.std.CollectionDeserializer._deserializeFromArray(CollectionDeserializer.java:357)
>  at 
> com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:244)
>  at 
> com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:28)
>  at 
> com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:542)
>  at 
> com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeWithErrorWrapping(BeanDeserializer.java:565)
>  at 
> com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:449)
>  at 
> com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1405)
>  at 
> com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:362)
>  at 
> com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:195)
>  at 
> com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322)
>  at 
> com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4593)
>  at 
> com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3609) 
> at 
> io.kestra.runner.kafka.serializers.JsonDeserializer.deserialize(JsonDeserializer.java:32)
>  at 
> org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.deserialize(ValueAndTimestampDeserializer.java:57)
>  at 
> org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.deserialize(ValueAndTimestampDeserializer.java:30)
>  at 
> org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:160) at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.outerValue(MeteredKeyValueStore.java:216)
>  at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueIterator.next(MeteredKeyValueStore.java:254)
>  at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueIterator.next(MeteredKeyValueStore.java:231)
>  at java.base/java.util.Iterator.forEachRemaining(Unknown Source) at 
> java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Unknown 
> Source) at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown 
> Source) at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source) 
> at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(Unknown 
> Source) at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown 
> Source) at java.base/java.util.stream.ReferencePipeline.collect(Unknown 
> Source) at 
> io.kestra.core.services.FlowService.keepLastVersionCollector(FlowService.java:57)
>  at io.kestra.core.services.FlowService.keepLastVersion(FlowService.java:37) 
> at 
> io.kestra.runner.kafka.streams.FlowWithTriggerTransformer.transform(FlowWithTriggerTransformer.java:39)
>  at 
> io.kestra.runner.kafka.streams.FlowWithTriggerTransformer.transform(FlowWithTriggerTransformer.java:18)
>  at 
> org.apache.kafka.streams.kstream.internals.KStreamFlatTransform$KStreamFlatTransformProcessor.process(KStreamFlatTransform.java:64)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
>  at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
>  at 
> org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
>  at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
>  at 
> org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:64)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
>  at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
>  at 
> org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
>  at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
>  at 
> org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:64)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
>  at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
>  at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:79)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
>  at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
>  at 
> org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
>  at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
>  at 
> org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
>  at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
>  at 
> org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:64)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
>  at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
>  at 
> org.apache.kafka.streams.kstream.internals.PassThrough$PassThroughProcessor.process(PassThrough.java:33)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
>  at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
>  at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:85)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:678)
>  at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:678)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1034)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:695)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1034)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to