GitHub user rhauch opened a pull request: https://github.com/apache/kafka/pull/256
KAFKA-2594 Added InMemoryLRUCacheStore Added a new `KeyValueStore` implementation called `InMemoryLRUCacheStore` that keeps a maximum number of entries in-memory, and as the size exceeds the capacity the least-recently used entry is removed from the store and the backing topic. Also added unit tests for this new store and the existing `InMemoryKeyValueStore` and `RocksDBKeyValueStore` implementations. A new `KeyValueStoreTestDriver` class simplifies all of the other tests, and can be used by other libraries to help test their own custom implementations. This PR depends upon [KAFKA-2593](https://issues.apache.org/jira/browse/KAFKA-2593) and its PR at https://github.com/apache/kafka/pull/255. Once that PR is merged, I can rebase this PR if desired. Two issues were uncovered when creating these new unit tests, and both are also addressed as separate (small) commits in this PR: * The `RocksDBKeyValueStore` initialization was not creating the file system directory if missing. * `MeteredKeyValueStore` was casting to `ProcessorContextImpl` to access the `RecordCollector`, which prevent using `MeteredKeyValueStore` implementations in tests where something other than `ProcessorContextImpl` was used. The fix was to introduce a `RecordCollector.Supplier` interface to define this `recordCollector()` method, and change `ProcessorContextImpl` and `MockProcessorContext` to both implement this interface. Now, `MeteredKeyValueStore` can cast to the new interface to access the record collector rather than to a single concrete implementation, making it possible to use any and all current stores inside unit tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rhauch/kafka kafka-2594 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/256.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #256 ---- commit d7e25f55dcbac1b64cfb0a8f642ce55078bd7d03 Author: Randall Hauch <rha...@gmail.com> Date: 2015-09-28T18:34:51Z KAFKA-2593 Key value stores can use custom serializers and deserializers Add support for the key value stores to use specified serializers and deserializers (aka, "serdes"). Prior to this change, the stores were limited to only the default serdes specified in the topology's configuration and exposed to the processors via the ProcessorContext. Now, using InMemoryKeyValueStore and RocksDBKeyValueStore are similar: both are parameterized on the key and value types, and both have similar multiple static factory methods. The static factory methods either take explicit key and value serdes, take key and value class types so the serdes can be inferred (only for the built-in serdes for string, integer, long, and byte array types), or use the default serdes on the ProcessorContext. commit 3b903d31503fcce7cf2b9607b68893a02425dacf Author: Randall Hauch <rha...@gmail.com> Date: 2015-09-28T19:11:55Z KAFKA-2594: Corrected RocksDBKeyValueStore to properly create directory if missing commit 33e4fd2b8bb56783d62f55e223757e8c479212ed Author: Randall Hauch <rha...@gmail.com> Date: 2015-09-28T19:15:24Z KAFKA-2594: MeteredKeyValueStore no longer casts to ProcessorContextImpl The cast was used to access ProcessorContextImpl.recordCollector(), and the cast prevent using MeteredKeyValueStore implementations in tests where something other than ProcessorContextImpl was used. Introduced a RecordCollector.Supplier interface to define this `recordCollector()` method, and changed ProcessorContextImpl and MockProcessorContext to both implement this interface. Now, MeteredKeyValueStore can cast to the interface to access the record collector rather than to a single concrete implementation, making it possible to use the stores inside unit tests. commit ff5e779fa1d6955bf0bf633191edd9d7a7cb9a9c Author: Randall Hauch <rha...@gmail.com> Date: 2015-09-28T19:26:16Z KAFKA-2594 Added InMemoryLRUCacheStore Added a new KeyValueStore implementation that keeps a maximum number of entries in-memory, and as the size exceeds the capacity the least-recently used entry is removed from the store and the backing topic. Also added unit tests for this new store and the existing InMemoryKeyValueStore and RocksDBKeyValueStore implementations. A new KeyValueStoreTestDriver class simplifies all of the other tests. ---- --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---