[ 
https://issues.apache.org/jira/browse/KAFKA-16355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17824796#comment-17824796
 ] 

PoAn Yang commented on KAFKA-16355:
-----------------------------------

Hi [~mimaison], I am interested in this issue. May I take it? Thank you.

> ConcurrentModificationException in 
> InMemoryTimeOrderedKeyValueBuffer.evictWhile
> -------------------------------------------------------------------------------
>
>                 Key: KAFKA-16355
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16355
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.5.1
>            Reporter: Mickael Maison
>            Priority: Major
>
> While a Streams application was restoring its state after an outage, it hit 
> the following:
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=0_16, processor=KSTREAM-SOURCE-0000000000, topic=<TOPIC>, 
> partition=16, offset=454875695, 
> stacktrace=java.util.ConcurrentModificationException
> at java.base/java.util.TreeMap$PrivateEntryIterator.remove(TreeMap.java:1507)
> at 
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.evictWhile(InMemoryTimeOrderedKeyValueBuffer.java:423)
> at 
> org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessorSupplier$KTableSuppressProcessor.enforceConstraints(KTableSuppressProcessorSupplier.java:178)
> at 
> org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessorSupplier$KTableSuppressProcessor.process(KTableSuppressProcessorSupplier.java:165)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)
> at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$setFlushListener$4(MeteredWindowStore.java:181)
> at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.putAndMaybeForward(CachingWindowStore.java:124)
> at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.lambda$initInternal$0(CachingWindowStore.java:99)
> at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:158)
> at 
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:252)
> at 
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:302)
> at 
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:179)
> at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.put(CachingWindowStore.java:173)
> at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.put(CachingWindowStore.java:47)
> at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$put$5(MeteredWindowStore.java:201)
> at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
> at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:200)
> at 
> org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$WindowStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:201)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:138)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:215)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:159)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:215)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:159)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:810)
> at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:810)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:741)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1747)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:807)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:767)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1747)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:807)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
> That specific application is running 3.5.1, but looking at 
> InMemoryTimeOrderedKeyValueBuffer.evictWhile(), it seems the code has not 
> changed much since then so it may still happen.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to