This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 1acae2a MINOR: Clean up ThreadCacheTest (#6485) 1acae2a is described below commit 1acae2a67c8fce071bfe7a373187bdff209f1705 Author: Bill Bejeck <bbej...@gmail.com> AuthorDate: Fri Mar 22 09:27:58 2019 -0400 MINOR: Clean up ThreadCacheTest (#6485) Minor clean up ofThreadCacheTest Reviewers: Guozhang Wang <wangg...@gmail.com>, Matthias J. Sax <mj...@apache.org> --- .../streams/state/internals/ThreadCacheTest.java | 96 +++++----------------- 1 file changed, 22 insertions(+), 74 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java index 5882ee4..c9c5789 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java @@ -146,7 +146,7 @@ public class ThreadCacheTest { } @Test - public void evict() throws IOException { + public void evict() { final List<KeyValue<String, String>> received = new ArrayList<>(); final List<KeyValue<String, String>> expected = Collections.singletonList( new KeyValue<>("K1", "V1")); @@ -161,14 +161,10 @@ public class ThreadCacheTest { final ThreadCache cache = new ThreadCache(logContext, memoryCacheEntrySize(kv.key.getBytes(), kv.value.getBytes(), ""), new MockStreamsMetrics(new Metrics())); - cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() { - @Override - public void apply(final List<ThreadCache.DirtyEntry> dirty) { - for (final ThreadCache.DirtyEntry dirtyEntry : dirty) { - received.add(new KeyValue<>(dirtyEntry.key().toString(), new String(dirtyEntry.newValue()))); - } + cache.addDirtyEntryFlushListener(namespace, dirty -> { + for (final ThreadCache.DirtyEntry dirtyEntry : dirty) { + received.add(new KeyValue<>(dirtyEntry.key().toString(), new String(dirtyEntry.newValue()))); } - }); for (final KeyValue<String, String> kvToInsert : toInsert) { @@ -200,12 +196,7 @@ public class ThreadCacheTest { final Bytes key = Bytes.wrap(new byte[]{0}); final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new Metrics())); final List<ThreadCache.DirtyEntry> received = new ArrayList<>(); - cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() { - @Override - public void apply(final List<ThreadCache.DirtyEntry> dirty) { - received.addAll(dirty); - } - }); + cache.addDirtyEntryFlushListener(namespace, received::addAll); cache.put(namespace, key, dirtyEntry(key.get())); assertEquals(key.get(), cache.delete(namespace, key).value()); @@ -298,12 +289,8 @@ public class ThreadCacheTest { public void shouldSkipEntriesWhereValueHasBeenEvictedFromCache() { final int entrySize = memoryCacheEntrySize(new byte[1], new byte[1], ""); final ThreadCache cache = new ThreadCache(logContext, entrySize * 5, new MockStreamsMetrics(new Metrics())); - cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() { - @Override - public void apply(final List<ThreadCache.DirtyEntry> dirty) { + cache.addDirtyEntryFlushListener(namespace, dirty -> { }); - } - }); final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}}; for (int i = 0; i < 5; i++) { cache.put(namespace, Bytes.wrap(bytes[i]), dirtyEntry(bytes[i])); @@ -322,12 +309,9 @@ public class ThreadCacheTest { public void shouldFlushDirtyEntriesForNamespace() { final ThreadCache cache = new ThreadCache(logContext, 100000, new MockStreamsMetrics(new Metrics())); final List<byte[]> received = new ArrayList<>(); - cache.addDirtyEntryFlushListener(namespace1, new ThreadCache.DirtyEntryFlushListener() { - @Override - public void apply(final List<ThreadCache.DirtyEntry> dirty) { - for (final ThreadCache.DirtyEntry dirtyEntry : dirty) { - received.add(dirtyEntry.key().get()); - } + cache.addDirtyEntryFlushListener(namespace1, dirty -> { + for (final ThreadCache.DirtyEntry dirtyEntry : dirty) { + received.add(dirtyEntry.key().get()); } }); final List<byte[]> expected = Arrays.asList(new byte[]{0}, new byte[]{1}, new byte[]{2}); @@ -344,12 +328,9 @@ public class ThreadCacheTest { public void shouldNotFlushCleanEntriesForNamespace() { final ThreadCache cache = new ThreadCache(logContext, 100000, new MockStreamsMetrics(new Metrics())); final List<byte[]> received = new ArrayList<>(); - cache.addDirtyEntryFlushListener(namespace1, new ThreadCache.DirtyEntryFlushListener() { - @Override - public void apply(final List<ThreadCache.DirtyEntry> dirty) { - for (final ThreadCache.DirtyEntry dirtyEntry : dirty) { - received.add(dirtyEntry.key().get()); - } + cache.addDirtyEntryFlushListener(namespace1, dirty -> { + for (final ThreadCache.DirtyEntry dirtyEntry : dirty) { + received.add(dirtyEntry.key().get()); } }); final List<byte[]> toInsert = Arrays.asList(new byte[]{0}, new byte[]{1}, new byte[]{2}); @@ -366,12 +347,7 @@ public class ThreadCacheTest { private void shouldEvictImmediatelyIfCacheSizeIsZeroOrVerySmall(final ThreadCache cache) { final List<ThreadCache.DirtyEntry> received = new ArrayList<>(); - cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() { - @Override - public void apply(final List<ThreadCache.DirtyEntry> dirty) { - received.addAll(dirty); - } - }); + cache.addDirtyEntryFlushListener(namespace, received::addAll); cache.put(namespace, Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[]{0})); assertEquals(1, received.size()); @@ -396,12 +372,7 @@ public class ThreadCacheTest { public void shouldEvictAfterPutAll() { final List<ThreadCache.DirtyEntry> received = new ArrayList<>(); final ThreadCache cache = new ThreadCache(logContext, 1, new MockStreamsMetrics(new Metrics())); - cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() { - @Override - public void apply(final List<ThreadCache.DirtyEntry> dirty) { - received.addAll(dirty); - } - }); + cache.addDirtyEntryFlushListener(namespace, received::addAll); cache.putAll(namespace, Arrays.asList(KeyValue.pair(Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[]{5})), KeyValue.pair(Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[]{6})))); @@ -425,12 +396,7 @@ public class ThreadCacheTest { public void shouldNotForwardCleanEntryOnEviction() { final ThreadCache cache = new ThreadCache(logContext, 0, new MockStreamsMetrics(new Metrics())); final List<ThreadCache.DirtyEntry> received = new ArrayList<>(); - cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() { - @Override - public void apply(final List<ThreadCache.DirtyEntry> dirty) { - received.addAll(dirty); - } - }); + cache.addDirtyEntryFlushListener(namespace, received::addAll); cache.put(namespace, Bytes.wrap(new byte[]{1}), cleanEntry(new byte[]{0})); assertEquals(0, received.size()); } @@ -448,12 +414,7 @@ public class ThreadCacheTest { public void shouldEvictAfterPutIfAbsent() { final List<ThreadCache.DirtyEntry> received = new ArrayList<>(); final ThreadCache cache = new ThreadCache(logContext, 1, new MockStreamsMetrics(new Metrics())); - cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() { - @Override - public void apply(final List<ThreadCache.DirtyEntry> dirty) { - received.addAll(dirty); - } - }); + cache.addDirtyEntryFlushListener(namespace, received::addAll); cache.putIfAbsent(namespace, Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[]{5})); cache.putIfAbsent(namespace, Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[]{6})); @@ -468,26 +429,13 @@ public class ThreadCacheTest { final int maxCacheSizeInBytes = 100; final ThreadCache threadCache = new ThreadCache(logContext, maxCacheSizeInBytes, new MockStreamsMetrics(new Metrics())); // trigger a put into another cache on eviction from "name" - threadCache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() { - @Override - public void apply(final List<ThreadCache.DirtyEntry> dirty) { - // put an item into an empty cache when the total cache size - // is already > than maxCacheSizeBytes - threadCache.put(namespace1, Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[2])); - } - }); - threadCache.addDirtyEntryFlushListener(namespace1, new ThreadCache.DirtyEntryFlushListener() { - @Override - public void apply(final List<ThreadCache.DirtyEntry> dirty) { - // - } - }); - threadCache.addDirtyEntryFlushListener(namespace2, new ThreadCache.DirtyEntryFlushListener() { - @Override - public void apply(final List<ThreadCache.DirtyEntry> dirty) { - - } + threadCache.addDirtyEntryFlushListener(namespace, dirty -> { + // put an item into an empty cache when the total cache size + // is already > than maxCacheSizeBytes + threadCache.put(namespace1, Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[2])); }); + threadCache.addDirtyEntryFlushListener(namespace1, dirty -> { }); + threadCache.addDirtyEntryFlushListener(namespace2, dirty -> { }); threadCache.put(namespace2, Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[1])); threadCache.put(namespace, Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[1]));