GitHub user dguy opened a pull request:
https://github.com/apache/kafka/pull/2051
KAFKA-4311: Multi layer cache eviction causes forwarding to incorrect
ProcessorNode
Given a topology like the one below. If a record arriving in `tableOne`
causes a cache eviction, it will trigger the `leftJoin` that will do a `get`
from `reducer-store`. If the key is not currently cached in `reducer-store`,
but is in the backing store, it will be put into the cache, and it may also
trigger an eviction. If it does trigger an eviction and the eldest entry is
dirty it will flush the dirty keys. It is at this point that a
ClassCastException is thrown. This occurs because the ProcessorContext is still
set to the context of the `leftJoin` and the next child in the topology is
`mapValues`.
We need to set the correct `ProcessorNode`, on the context, in the
`ForwardingCacheFlushListener` prior to calling `context.forward`. We also need
to set remember to reset the `ProcessorNode` to the previous node once
`context.forward` has completed.
```
final KTable one = builder.table(Serdes.String(),
Serdes.String(), tableOne, tableOne);
final KTable two = builder.table(Serdes.Long(),
Serdes.String(), tableTwo, tableTwo);
final KTable reduce = two.groupBy(new
KeyValueMapper>() {
@Override
public KeyValue apply(final Long key, final
String value) {
return new KeyValue<>(value, key);
}
}, Serdes.String(), Serdes.Long())
.reduce(new Reducer() {..}, new Reducer() {..},
"reducer-store");
one.leftJoin(reduce, new ValueJoiner() {..})
.mapValues(new ValueMapper() {..});
```
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/dguy/kafka kafka-4311
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/kafka/pull/2051.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2051
commit 5c8896a9d2aaca6bf5b9fb8de4de8140919fb280
Author: Damian Guy
Date: 2016-10-21T11:27:17Z
Save and set the current processor node in FlushingCacheListener.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---