This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1acae2a  MINOR: Clean up ThreadCacheTest (#6485)
1acae2a is described below

commit 1acae2a67c8fce071bfe7a373187bdff209f1705
Author: Bill Bejeck <bbej...@gmail.com>
AuthorDate: Fri Mar 22 09:27:58 2019 -0400

    MINOR: Clean up ThreadCacheTest (#6485)
    
    Minor clean up ofThreadCacheTest
    Reviewers: Guozhang Wang <wangg...@gmail.com>, Matthias J. Sax 
<mj...@apache.org>
---
 .../streams/state/internals/ThreadCacheTest.java   | 96 +++++-----------------
 1 file changed, 22 insertions(+), 74 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
index 5882ee4..c9c5789 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
@@ -146,7 +146,7 @@ public class ThreadCacheTest {
     }
 
     @Test
-    public void evict() throws IOException {
+    public void evict() {
         final List<KeyValue<String, String>> received = new ArrayList<>();
         final List<KeyValue<String, String>> expected = 
Collections.singletonList(
                 new KeyValue<>("K1", "V1"));
@@ -161,14 +161,10 @@ public class ThreadCacheTest {
         final ThreadCache cache = new ThreadCache(logContext,
                                                   
memoryCacheEntrySize(kv.key.getBytes(), kv.value.getBytes(), ""),
                                                   new MockStreamsMetrics(new 
Metrics()));
-        cache.addDirtyEntryFlushListener(namespace, new 
ThreadCache.DirtyEntryFlushListener() {
-            @Override
-            public void apply(final List<ThreadCache.DirtyEntry> dirty) {
-                for (final ThreadCache.DirtyEntry dirtyEntry : dirty) {
-                    received.add(new KeyValue<>(dirtyEntry.key().toString(), 
new String(dirtyEntry.newValue())));
-                }
+        cache.addDirtyEntryFlushListener(namespace, dirty -> {
+            for (final ThreadCache.DirtyEntry dirtyEntry : dirty) {
+                received.add(new KeyValue<>(dirtyEntry.key().toString(), new 
String(dirtyEntry.newValue())));
             }
-
         });
 
         for (final KeyValue<String, String> kvToInsert : toInsert) {
@@ -200,12 +196,7 @@ public class ThreadCacheTest {
         final Bytes key = Bytes.wrap(new byte[]{0});
         final ThreadCache cache = new ThreadCache(logContext, 10000L, new 
MockStreamsMetrics(new Metrics()));
         final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
-        cache.addDirtyEntryFlushListener(namespace, new 
ThreadCache.DirtyEntryFlushListener() {
-            @Override
-            public void apply(final List<ThreadCache.DirtyEntry> dirty) {
-                received.addAll(dirty);
-            }
-        });
+        cache.addDirtyEntryFlushListener(namespace, received::addAll);
         cache.put(namespace, key, dirtyEntry(key.get()));
         assertEquals(key.get(), cache.delete(namespace, key).value());
 
@@ -298,12 +289,8 @@ public class ThreadCacheTest {
     public void shouldSkipEntriesWhereValueHasBeenEvictedFromCache() {
         final int entrySize = memoryCacheEntrySize(new byte[1], new byte[1], 
"");
         final ThreadCache cache = new ThreadCache(logContext, entrySize * 5, 
new MockStreamsMetrics(new Metrics()));
-        cache.addDirtyEntryFlushListener(namespace, new 
ThreadCache.DirtyEntryFlushListener() {
-            @Override
-            public void apply(final List<ThreadCache.DirtyEntry> dirty) {
+        cache.addDirtyEntryFlushListener(namespace, dirty -> { });
 
-            }
-        });
         final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, 
{9}};
         for (int i = 0; i < 5; i++) {
             cache.put(namespace, Bytes.wrap(bytes[i]), dirtyEntry(bytes[i]));
@@ -322,12 +309,9 @@ public class ThreadCacheTest {
     public void shouldFlushDirtyEntriesForNamespace() {
         final ThreadCache cache = new ThreadCache(logContext, 100000, new 
MockStreamsMetrics(new Metrics()));
         final List<byte[]> received = new ArrayList<>();
-        cache.addDirtyEntryFlushListener(namespace1, new 
ThreadCache.DirtyEntryFlushListener() {
-            @Override
-            public void apply(final List<ThreadCache.DirtyEntry> dirty) {
-                for (final ThreadCache.DirtyEntry dirtyEntry : dirty) {
-                    received.add(dirtyEntry.key().get());
-                }
+        cache.addDirtyEntryFlushListener(namespace1, dirty -> {
+            for (final ThreadCache.DirtyEntry dirtyEntry : dirty) {
+                received.add(dirtyEntry.key().get());
             }
         });
         final List<byte[]> expected = Arrays.asList(new byte[]{0}, new 
byte[]{1}, new byte[]{2});
@@ -344,12 +328,9 @@ public class ThreadCacheTest {
     public void shouldNotFlushCleanEntriesForNamespace() {
         final ThreadCache cache = new ThreadCache(logContext, 100000, new 
MockStreamsMetrics(new Metrics()));
         final List<byte[]> received = new ArrayList<>();
-        cache.addDirtyEntryFlushListener(namespace1, new 
ThreadCache.DirtyEntryFlushListener() {
-            @Override
-            public void apply(final List<ThreadCache.DirtyEntry> dirty) {
-                for (final ThreadCache.DirtyEntry dirtyEntry : dirty) {
-                    received.add(dirtyEntry.key().get());
-                }
+        cache.addDirtyEntryFlushListener(namespace1, dirty -> {
+            for (final ThreadCache.DirtyEntry dirtyEntry : dirty) {
+                received.add(dirtyEntry.key().get());
             }
         });
         final List<byte[]> toInsert =  Arrays.asList(new byte[]{0}, new 
byte[]{1}, new byte[]{2});
@@ -366,12 +347,7 @@ public class ThreadCacheTest {
     private void shouldEvictImmediatelyIfCacheSizeIsZeroOrVerySmall(final 
ThreadCache cache) {
         final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
 
-        cache.addDirtyEntryFlushListener(namespace, new 
ThreadCache.DirtyEntryFlushListener() {
-            @Override
-            public void apply(final List<ThreadCache.DirtyEntry> dirty) {
-                received.addAll(dirty);
-            }
-        });
+        cache.addDirtyEntryFlushListener(namespace, received::addAll);
         cache.put(namespace, Bytes.wrap(new byte[]{0}), dirtyEntry(new 
byte[]{0}));
         assertEquals(1, received.size());
 
@@ -396,12 +372,7 @@ public class ThreadCacheTest {
     public void shouldEvictAfterPutAll() {
         final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
         final ThreadCache cache = new ThreadCache(logContext, 1, new 
MockStreamsMetrics(new Metrics()));
-        cache.addDirtyEntryFlushListener(namespace, new 
ThreadCache.DirtyEntryFlushListener() {
-            @Override
-            public void apply(final List<ThreadCache.DirtyEntry> dirty) {
-                received.addAll(dirty);
-            }
-        });
+        cache.addDirtyEntryFlushListener(namespace, received::addAll);
 
         cache.putAll(namespace, Arrays.asList(KeyValue.pair(Bytes.wrap(new 
byte[]{0}), dirtyEntry(new byte[]{5})),
                                               KeyValue.pair(Bytes.wrap(new 
byte[]{1}), dirtyEntry(new byte[]{6}))));
@@ -425,12 +396,7 @@ public class ThreadCacheTest {
     public void shouldNotForwardCleanEntryOnEviction() {
         final ThreadCache cache = new ThreadCache(logContext, 0, new 
MockStreamsMetrics(new Metrics()));
         final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
-        cache.addDirtyEntryFlushListener(namespace, new 
ThreadCache.DirtyEntryFlushListener() {
-            @Override
-            public void apply(final List<ThreadCache.DirtyEntry> dirty) {
-                received.addAll(dirty);
-            }
-        });
+        cache.addDirtyEntryFlushListener(namespace, received::addAll);
         cache.put(namespace, Bytes.wrap(new byte[]{1}), cleanEntry(new 
byte[]{0}));
         assertEquals(0, received.size());
     }
@@ -448,12 +414,7 @@ public class ThreadCacheTest {
     public void shouldEvictAfterPutIfAbsent() {
         final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
         final ThreadCache cache = new ThreadCache(logContext, 1, new 
MockStreamsMetrics(new Metrics()));
-        cache.addDirtyEntryFlushListener(namespace, new 
ThreadCache.DirtyEntryFlushListener() {
-            @Override
-            public void apply(final List<ThreadCache.DirtyEntry> dirty) {
-                received.addAll(dirty);
-            }
-        });
+        cache.addDirtyEntryFlushListener(namespace, received::addAll);
 
         cache.putIfAbsent(namespace, Bytes.wrap(new byte[]{0}), dirtyEntry(new 
byte[]{5}));
         cache.putIfAbsent(namespace, Bytes.wrap(new byte[]{1}), dirtyEntry(new 
byte[]{6}));
@@ -468,26 +429,13 @@ public class ThreadCacheTest {
         final int maxCacheSizeInBytes = 100;
         final ThreadCache threadCache = new ThreadCache(logContext, 
maxCacheSizeInBytes, new MockStreamsMetrics(new Metrics()));
         // trigger a put into another cache on eviction from "name"
-        threadCache.addDirtyEntryFlushListener(namespace, new 
ThreadCache.DirtyEntryFlushListener() {
-            @Override
-            public void apply(final List<ThreadCache.DirtyEntry> dirty) {
-                // put an item into an empty cache when the total cache size
-                // is already > than maxCacheSizeBytes
-                threadCache.put(namespace1, Bytes.wrap(new byte[]{0}), 
dirtyEntry(new byte[2]));
-            }
-        });
-        threadCache.addDirtyEntryFlushListener(namespace1, new 
ThreadCache.DirtyEntryFlushListener() {
-            @Override
-            public void apply(final List<ThreadCache.DirtyEntry> dirty) {
-               //
-            }
-        });
-        threadCache.addDirtyEntryFlushListener(namespace2, new 
ThreadCache.DirtyEntryFlushListener() {
-            @Override
-            public void apply(final List<ThreadCache.DirtyEntry> dirty) {
-
-            }
+        threadCache.addDirtyEntryFlushListener(namespace, dirty -> {
+            // put an item into an empty cache when the total cache size
+            // is already > than maxCacheSizeBytes
+            threadCache.put(namespace1, Bytes.wrap(new byte[]{0}), 
dirtyEntry(new byte[2]));
         });
+        threadCache.addDirtyEntryFlushListener(namespace1, dirty -> { });
+        threadCache.addDirtyEntryFlushListener(namespace2, dirty -> { });
 
         threadCache.put(namespace2, Bytes.wrap(new byte[]{1}), dirtyEntry(new 
byte[1]));
         threadCache.put(namespace, Bytes.wrap(new byte[]{1}), dirtyEntry(new 
byte[1]));

Reply via email to