GitHub user dguy opened a pull request:

    https://github.com/apache/kafka/pull/2051

    KAFKA-4311: Multi layer cache eviction causes forwarding to incorrect 
ProcessorNode

    Given a topology like the one below. If a record arriving in `tableOne` 
causes a cache eviction, it will trigger the `leftJoin` that will do a `get` 
from `reducer-store`. If the key is not currently cached in `reducer-store`, 
but is in the backing store, it will be put into the cache, and it may also 
trigger an eviction. If it does trigger an eviction and the eldest entry is 
dirty it will flush the dirty keys. It is at this point that a 
ClassCastException is thrown. This occurs because the ProcessorContext is still 
set to the context of the `leftJoin` and the next child in the topology is 
`mapValues`.
    We need to set the correct `ProcessorNode`, on the context, in the 
`ForwardingCacheFlushListener` prior to calling `context.forward`. We also need 
to set remember to reset the `ProcessorNode` to the previous node once 
`context.forward` has completed.
    
    ```
           final KTable<String, String> one = builder.table(Serdes.String(), 
Serdes.String(), tableOne, tableOne);
            final KTable<Long, String> two = builder.table(Serdes.Long(), 
Serdes.String(), tableTwo, tableTwo);
            final KTable<String, Long> reduce = two.groupBy(new 
KeyValueMapper<Long, String, KeyValue<String, Long>>() {
                @Override
                public KeyValue<String, Long> apply(final Long key, final 
String value) {
                    return new KeyValue<>(value, key);
                }
            }, Serdes.String(), Serdes.Long())
                    .reduce(new Reducer<Long>() {..}, new Reducer<Long>() {..}, 
"reducer-store");
                    
        one.leftJoin(reduce, new ValueJoiner<String, Long, String>() {..})
            .mapValues(new ValueMapper<String, String>() {..});
                       
    ```

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/dguy/kafka kafka-4311

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/kafka/pull/2051.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2051
    
----
commit 5c8896a9d2aaca6bf5b9fb8de4de8140919fb280
Author: Damian Guy <damian....@gmail.com>
Date:   2016-10-21T11:27:17Z

    Save and set the current processor node in FlushingCacheListener.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to