[GitHub] kafka pull request #2051: KAFKA-4311: Multi layer cache eviction causes forw...

2016-11-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2051


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #2051: KAFKA-4311: Multi layer cache eviction causes forw...

2016-10-21 Thread dguy
GitHub user dguy opened a pull request:

https://github.com/apache/kafka/pull/2051

KAFKA-4311: Multi layer cache eviction causes forwarding to incorrect 
ProcessorNode

Given a topology like the one below. If a record arriving in `tableOne` 
causes a cache eviction, it will trigger the `leftJoin` that will do a `get` 
from `reducer-store`. If the key is not currently cached in `reducer-store`, 
but is in the backing store, it will be put into the cache, and it may also 
trigger an eviction. If it does trigger an eviction and the eldest entry is 
dirty it will flush the dirty keys. It is at this point that a 
ClassCastException is thrown. This occurs because the ProcessorContext is still 
set to the context of the `leftJoin` and the next child in the topology is 
`mapValues`.
We need to set the correct `ProcessorNode`, on the context, in the 
`ForwardingCacheFlushListener` prior to calling `context.forward`. We also need 
to set remember to reset the `ProcessorNode` to the previous node once 
`context.forward` has completed.

```
   final KTable one = builder.table(Serdes.String(), 
Serdes.String(), tableOne, tableOne);
final KTable two = builder.table(Serdes.Long(), 
Serdes.String(), tableTwo, tableTwo);
final KTable reduce = two.groupBy(new 
KeyValueMapper>() {
@Override
public KeyValue apply(final Long key, final 
String value) {
return new KeyValue<>(value, key);
}
}, Serdes.String(), Serdes.Long())
.reduce(new Reducer() {..}, new Reducer() {..}, 
"reducer-store");

one.leftJoin(reduce, new ValueJoiner() {..})
.mapValues(new ValueMapper() {..});
   
```

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dguy/kafka kafka-4311

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2051.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2051


commit 5c8896a9d2aaca6bf5b9fb8de4de8140919fb280
Author: Damian Guy 
Date:   2016-10-21T11:27:17Z

Save and set the current processor node in FlushingCacheListener.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---