[ 
https://issues.apache.org/jira/browse/KAFKA-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15595593#comment-15595593
 ] 

Frank Lyaruu commented on KAFKA-4311:
-------------------------------------

The ClassCast exception is gone, but I still see the Illegal state exception:

java.lang.IllegalStateException: Key found in dirty key set, but entry is null
        at 
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:112)
        at 
org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:199)
        at 
org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:198)
        at 
org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:121)
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:147)
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:134)
        at 
org.apache.kafka.streams.kstream.internals.KTableReduce$KTableReduceProcessor.process(KTableReduce.java:74)
        at 
org.apache.kafka.streams.kstream.internals.KTableReduce$KTableReduceProcessor.process(KTableReduce.java:52)
        at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:200)
        at 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
        at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:180)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:439)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:245)

and: 

org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed to 
flush state store develop2_personsperteam3-personsperteam-person-develop2
        at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:331)
        at 
org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:180)
        at 
org.apache.kafka.streams.processor.internals.StreamThread$4.apply(StreamThread.java:372)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:331)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.flushAllState(StreamThread.java:368)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:304)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:272)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:255)
Caused by: java.lang.IllegalStateException: Key found in dirty key set, but 
entry is null
        at 
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:112)
        at 
org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:100)
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:111)
        at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:329)

Again, when I set CACHE_MAX_BYTES_BUFFERING_CONFIG to 0, it runs fine.


> Multi layer cache eviction causes forwarding to incorrect ProcessorNode 
> ------------------------------------------------------------------------
>
>                 Key: KAFKA-4311
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4311
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.1.0
>            Reporter: Damian Guy
>            Assignee: Damian Guy
>             Fix For: 0.10.1.1
>
>
> The two exceptions below were reported by Frank on the dev mailing list. 
> After investigation, the root cause is multiple cache evictions happening in 
> the same topology. 
> Given a topology like the one below. If a record arriving in `tableOne` 
> causes a cache eviction, it will trigger the `leftJoin` that will do a `get` 
> from `reducer-store`. If the key is not currently cached in `reducer-store`, 
> but is in the backing store, it will be put into the cache, and it may also 
> trigger an eviction. If it does trigger an eviction and the eldest entry is 
> dirty it will flush the dirty keys. It is at this point that the exception in 
> the comment happens (ClassCastException). This occurs because the 
> ProcessorContext is still set to the context of the `leftJoin` and the next 
> child in the topology is `mapValues`.
> We need to set the correct `ProcessorNode`, on the context,  in the 
> `ForwardingCacheFlushListener` prior to calling `context.forward`. We also 
> need to set remember to reset the `ProcessorNode` to the previous node once 
> `context.forward` has completed.
> {code}
>         final KTable<String, String> one = builder.table(Serdes.String(), 
> Serdes.String(), tableOne, tableOne);
>         final KTable<Long, String> two = builder.table(Serdes.Long(), 
> Serdes.String(), tableTwo, tableTwo);
>         final KTable<String, Long> reduce = two.groupBy(new 
> KeyValueMapper<Long, String, KeyValue<String, Long>>() {
>             @Override
>             public KeyValue<String, Long> apply(final Long key, final String 
> value) {
>                 return new KeyValue<>(value, key);
>             }
>         }, Serdes.String(), Serdes.Long())
>                 .reduce(new Reducer<Long>() {
>                     @Override
>                     public Long apply(final Long value1, final Long value2) {
>                         return value1 + value2;
>                     }
>                 }, new Reducer<Long>() {
>                     @Override
>                     public Long apply(final Long value1, final Long value2) {
>                         return value1 - value2;
>                     }
>                 }, "reducer-store");
>     one.leftJoin(reduce, new ValueJoiner<String, Long, String>() {
>             @Override
>             public String apply(final String value1, final Long value2) {
>                 return value1 + ":" + value2;
>             }
>         })
>         .mapValues(new ValueMapper<String, String>() {
>                     @Override
>                     public String apply(final String value) {
>                         return value;
>                     }
>                 });
> {code}
> This exception is actually a symptom of the exception reported below in the 
> comment. After the first exception is thrown, the StreamThread triggers a 
> shutdown that then throws this exception.
> [StreamThread-1] ERROR
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [StreamThread-1] Failed to close state manager for StreamTask 0_0:
> org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed
> to close state store addr-organization
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:342)
> at
> org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:121)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$2.apply(StreamThread.java:341)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:322)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.closeAllStateManagers(StreamThread.java:338)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:299)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:262)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:245)
> Caused by: java.lang.IllegalStateException: Key found in dirty key set, but
> entry is null
> at
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:112)
> at
> org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:100)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:111)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(CachingKeyValueStore.java:117)
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:340)
> ... 7 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to