[ https://issues.apache.org/jira/browse/KAFKA-13195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17399290#comment-17399290 ]
Matthias J. Sax edited comment on KAFKA-13195 at 8/15/21, 1:41 AM: ------------------------------------------------------------------- It's actually on purpose to only use the deserialization handler for input records, to allow users to skip over (drop) corrupted input messages (ie, poison pills). For the state store, a deserialization error indicates corrupted state and we should not continue processing. How do you end up with corrupted state? Not sure if I understand. When data is written into the state store, the configured store serde is used, and because the serde can serialize the data, it should also be able to deserialize it. was (Author: mjsax): It's actually on purpose to only use the deserialization handler for input records, to allow users to skip over (drop) corrupted input messages (ie, poison pills). For the state store, a deserialization error indicates corrupted state and we should not continue processing. How do you end up with corrupted state? Not sure if I understand. Why data is written into the state store, the configured store serde is used, and because the serde can serialize the data, it should also be able to deserialize it. > StateSerde don't honor DeserializationExceptionHandler > ------------------------------------------------------ > > Key: KAFKA-13195 > URL: https://issues.apache.org/jira/browse/KAFKA-13195 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.8.0 > Reporter: Ludo > Priority: Major > > Kafka streams allow to configure an > [DeserializationExceptionHandler|https://docs.confluent.io/platform/current/streams/faq.html#failure-and-exception-handling] > > When you are using a StateStore most of message will be a copy of original > message in internal topic and mostly will use the same serializer if the > message is another type. > You can see > [here|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java#L159-L161] > that StateSerde is using the raw Deserializer and not honor the > {{StreamsConfig.}}{{DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG}}. > Leading to crash the application (reaching the > {{setUncaughtExceptionHandler}} method). > I think the state store must have the same behavior than the > {{RecordDeserializer}} and honor the DeserializationExceptionHandler. > > Stacktrace (coming from kafka stream 2.6.1) : > > {code:java} > Uncaught exception in Kafka Stream kestra_executor-StreamThread-1, closing > !Uncaught exception in Kafka Stream kestra_executor-StreamThread-1, closing > !org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=1_14, processor=workertaskjoined-repartition-source, > topic=kestra_executor-workertaskjoined-repartition, partition=14, > offset=167500, > stacktrace=org.apache.kafka.common.errors.SerializationException: > com.fasterxml.jackson.databind.exc.InvalidFormatException: Cannot deserialize > value of type `io.kestra.plugin.gcp.bigquery.ExtractToGcs$Format` from String > "txt": not one of the values accepted for Enum class: > [NEWLINE_DELIMITED_JSON, AVRO, PARQUET, CSV] at [Source: > (byte[])"{....[truncated 1270 bytes]; line: 1, column: 1187] (through > reference chain: > io.kestra.core.models.flows.Flow["tasks"]->java.util.ArrayList[1]->org.kestra.task.gcp.bigquery.ExtractToGcs["format"]) > at > com.fasterxml.jackson.databind.exc.InvalidFormatException.from(InvalidFormatException.java:67) > at > com.fasterxml.jackson.databind.DeserializationContext.weirdStringException(DeserializationContext.java:1851) > at > com.fasterxml.jackson.databind.DeserializationContext.handleWeirdStringValue(DeserializationContext.java:1079) > at > com.fasterxml.jackson.databind.deser.std.EnumDeserializer._deserializeAltString(EnumDeserializer.java:327) > at > com.fasterxml.jackson.databind.deser.std.EnumDeserializer._fromString(EnumDeserializer.java:214) > at > com.fasterxml.jackson.databind.deser.std.EnumDeserializer.deserialize(EnumDeserializer.java:188) > at > com.fasterxml.jackson.databind.deser.impl.FieldProperty.deserializeAndSet(FieldProperty.java:138) > at > com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:324) > at > com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:225) > at > com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:197) > at > com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer._deserializeTypedForId(AsPropertyTypeDeserializer.java:137) > at > com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer.deserializeTypedFromObject(AsPropertyTypeDeserializer.java:107) > at > com.fasterxml.jackson.databind.deser.AbstractDeserializer.deserializeWithType(AbstractDeserializer.java:263) > at > com.fasterxml.jackson.databind.deser.std.CollectionDeserializer._deserializeFromArray(CollectionDeserializer.java:357) > at > com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:244) > at > com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:28) > at > com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:542) > at > com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeWithErrorWrapping(BeanDeserializer.java:565) > at > com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:449) > at > com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1405) > at > com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:362) > at > com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:195) > at > com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322) > at > com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4593) > at > com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3609) > at > io.kestra.runner.kafka.serializers.JsonDeserializer.deserialize(JsonDeserializer.java:32) > at > org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.deserialize(ValueAndTimestampDeserializer.java:57) > at > org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.deserialize(ValueAndTimestampDeserializer.java:30) > at > org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:160) at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.outerValue(MeteredKeyValueStore.java:216) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueIterator.next(MeteredKeyValueStore.java:254) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueIterator.next(MeteredKeyValueStore.java:231) > at java.base/java.util.Iterator.forEachRemaining(Unknown Source) at > java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Unknown > Source) at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown > Source) at > java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source) > at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(Unknown > Source) at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown > Source) at java.base/java.util.stream.ReferencePipeline.collect(Unknown > Source) at > io.kestra.core.services.FlowService.keepLastVersionCollector(FlowService.java:57) > at io.kestra.core.services.FlowService.keepLastVersion(FlowService.java:37) > at > io.kestra.runner.kafka.streams.FlowWithTriggerTransformer.transform(FlowWithTriggerTransformer.java:39) > at > io.kestra.runner.kafka.streams.FlowWithTriggerTransformer.transform(FlowWithTriggerTransformer.java:18) > at > org.apache.kafka.streams.kstream.internals.KStreamFlatTransform$KStreamFlatTransformProcessor.process(KStreamFlatTransform.java:64) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168) > at > org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168) > at > org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:64) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168) > at > org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168) > at > org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:64) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168) > at > org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:79) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168) > at > org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168) > at > org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168) > at > org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:64) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168) > at > org.apache.kafka.streams.kstream.internals.PassThrough$PassThroughProcessor.process(PassThrough.java:33) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:85) > at > org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:678) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:678) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1034) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:695) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1034) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510) > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)