[ 
https://issues.apache.org/jira/browse/KAFKA-6667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-6667.
------------------------------------
    Resolution: Invalid

> Issues in kafka Streaming when using encryption with custom serializers and 
> deserializers
> -----------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6667
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6667
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.11.0.1
>            Reporter: Amandeep Singh
>            Priority: Major
>
> I am using kafka streams to do some aggregations. The data comes in at a very 
> rapid rate in my source topics.
> I am using a custom serialization and deserialization classes, that 
> encrypt/decrypt the bytes that are written to kafka. It works nicely without 
> encryption in serialization and deserialization.
> Encryption used is AES-256
> This is the stack trace with Encryption. I am also not able to figure out at 
> which step this breaks. 
> Exception in thread "prod-streamer-recording-partitioner-test-StreamThread-1" 
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=2_0, processor=KSTREAM-SOURCE-0000000009, 
> topic=recording-aggregator-partitioner-temp-analyze_recording-aggregation-repartition,
>  partition=0, offset=16971
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:202)
>  at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:342)
>  at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:334)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:624)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:513)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
> Caused by: org.apache.kafka.streams.errors.StreamsException: task [2_0] 
> exception caught when producing
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:136)
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:87)
>  at 
> org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59)
>  at 
> org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesStore.put(ChangeLoggingSegmentedBytesStore.java:59)
>  at 
> org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.put(MeteredSegmentedBytesStore.java:112)
>  at 
> org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowBytesStore.put(RocksDBWindowStore.java:43)
>  at 
> org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowBytesStore.put(RocksDBWindowStore.java:34)
>  at 
> org.apache.kafka.streams.state.internals.CachingWindowStore$1.apply(CachingWindowStore.java:95)
>  at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141)
>  at 
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:232)
>  at 
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:247)
>  at 
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:155)
>  at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.put(CachingWindowStore.java:148)
>  at 
> org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:111)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:45)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:131)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82)
>  at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:188)
>  ... 7 more
> Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The 
> message is 59787680 bytes when serialized which is larger than the maximum 
> request size you have configured with the max.request.size configuration.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to