This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6e7149b MINOR: code cleanup (#6055)
6e7149b is described below
commit 6e7149b77a10ac6aa4da2edd549f468015170236
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Jan 9 15:03:28 2019 +0100
MINOR: code cleanup (#6055)
Reviewers: Bill Bejeck <[email protected]>, John Roesler
<[email protected]>, Guozhang Wang <[email protected]>
---
.../internals/CompositeRestoreListenerTest.java | 6 +-
.../internals/GlobalStreamThreadTest.java | 67 ++--
.../internals/ProcessorStateManagerTest.java | 4 +-
.../processor/internals/StateRestorerTest.java | 20 +-
.../internals/StoreChangelogReaderTest.java | 297 +++++++++++----
.../processor/internals/StreamThreadTest.java | 419 +++++++++++----------
6 files changed, 515 insertions(+), 298 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java
index 5bfa4a6..5d7078c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java
@@ -26,7 +26,7 @@ import org.apache.kafka.test.MockBatchingStateRestoreListener;
import org.apache.kafka.test.MockStateRestoreListener;
import org.junit.Test;
-import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
@@ -46,8 +46,8 @@ public class CompositeRestoreListenerTest {
noListenBatchingStateRestoreCallback =
new MockNoListenBatchingStateRestoreCallback();
private final MockStateRestoreListener reportingStoreListener = new
MockStateRestoreListener();
- private final byte[] key = "key".getBytes(Charset.forName("UTF-8"));
- private final byte[] value = "value".getBytes(Charset.forName("UTF-8"));
+ private final byte[] key = "key".getBytes(StandardCharsets.UTF_8);
+ private final byte[] value = "value".getBytes(StandardCharsets.UTF_8);
private final Collection<KeyValue<byte[], byte[]>> records =
Collections.singletonList(KeyValue.pair(key, value));
private final Collection<ConsumerRecord<byte[], byte[]>> consumerRecords =
Collections.singletonList(
new ConsumerRecord<>("", 0, 0L, key, value)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
index 4e6023f..c0e0de3 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
@@ -36,7 +36,6 @@ import
org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.MockStateRestoreListener;
-import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.Before;
import org.junit.Test;
@@ -163,7 +162,7 @@ public class GlobalStreamThreadTest {
}
@Test(timeout = 30000)
- public void shouldStopRunningWhenClosedByUser() throws
InterruptedException {
+ public void shouldStopRunningWhenClosedByUser() throws Exception {
initializeConsumer();
globalStreamThread.start();
globalStreamThread.shutdown();
@@ -172,7 +171,7 @@ public class GlobalStreamThreadTest {
}
@Test
- public void shouldCloseStateStoresOnClose() throws InterruptedException {
+ public void shouldCloseStateStoresOnClose() throws Exception {
initializeConsumer();
globalStreamThread.start();
final StateStore globalStore =
builder.globalStateStores().get(GLOBAL_STORE_NAME);
@@ -184,7 +183,7 @@ public class GlobalStreamThreadTest {
@SuppressWarnings("unchecked")
@Test
- public void shouldTransitionToDeadOnClose() throws InterruptedException {
+ public void shouldTransitionToDeadOnClose() throws Exception {
initializeConsumer();
globalStreamThread.start();
globalStreamThread.shutdown();
@@ -195,7 +194,7 @@ public class GlobalStreamThreadTest {
@SuppressWarnings("unchecked")
@Test
- public void shouldStayDeadAfterTwoCloses() throws InterruptedException {
+ public void shouldStayDeadAfterTwoCloses() throws Exception {
initializeConsumer();
globalStreamThread.start();
globalStreamThread.shutdown();
@@ -207,15 +206,15 @@ public class GlobalStreamThreadTest {
@SuppressWarnings("unchecked")
@Test
- public void shouldTransitionToRunningOnStart() throws InterruptedException
{
+ public void shouldTransitionToRunningOnStart() throws Exception {
initializeConsumer();
globalStreamThread.start();
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return globalStreamThread.state() == RUNNING;
- }
- }, 10 * 1000, "Thread never started.");
+
+ TestUtils.waitForCondition(
+ () -> globalStreamThread.state() == RUNNING,
+ 10 * 1000,
+ "Thread never started.");
+
globalStreamThread.shutdown();
}
@@ -223,22 +222,19 @@ public class GlobalStreamThreadTest {
public void shouldDieOnInvalidOffsetException() throws Exception {
initializeConsumer();
globalStreamThread.start();
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return globalStreamThread.state() == RUNNING;
- }
- }, 10 * 1000, "Thread never started.");
+
+ TestUtils.waitForCondition(
+ () -> globalStreamThread.state() == RUNNING,
+ 10 * 1000,
+ "Thread never started.");
mockConsumer.updateEndOffsets(Collections.singletonMap(topicPartition,
1L));
mockConsumer.addRecord(new ConsumerRecord<>(GLOBAL_STORE_TOPIC_NAME,
0, 0L, "K1".getBytes(), "V1".getBytes()));
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return mockConsumer.position(topicPartition) == 1L;
- }
- }, 10 * 1000, "Input record never consumed");
+ TestUtils.waitForCondition(
+ () -> mockConsumer.position(topicPartition) == 1L,
+ 10 * 1000,
+ "Input record never consumed");
mockConsumer.setException(new InvalidOffsetException("Try Again!") {
@Override
@@ -249,20 +245,21 @@ public class GlobalStreamThreadTest {
// feed first record for recovery
mockConsumer.addRecord(new ConsumerRecord<>(GLOBAL_STORE_TOPIC_NAME,
0, 0L, "K1".getBytes(), "V1".getBytes()));
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return globalStreamThread.state() == DEAD;
- }
- }, 10 * 1000, "GlobalStreamThread should have died.");
+ TestUtils.waitForCondition(
+ () -> globalStreamThread.state() == DEAD,
+ 10 * 1000,
+ "GlobalStreamThread should have died.");
}
private void initializeConsumer() {
- mockConsumer.updatePartitions(GLOBAL_STORE_TOPIC_NAME,
Collections.singletonList(new PartitionInfo(GLOBAL_STORE_TOPIC_NAME,
- 0,
- null,
- new Node[0],
- new Node[0])));
+ mockConsumer.updatePartitions(
+ GLOBAL_STORE_TOPIC_NAME,
+ Collections.singletonList(new PartitionInfo(
+ GLOBAL_STORE_TOPIC_NAME,
+ 0,
+ null,
+ new Node[0],
+ new Node[0])));
mockConsumer.updateBeginningOffsets(Collections.singletonMap(topicPartition,
0L));
mockConsumer.updateEndOffsets(Collections.singletonMap(topicPartition,
0L));
mockConsumer.assign(Collections.singleton(topicPartition));
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 475b5e9..90f2420 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -41,7 +41,7 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
-import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -82,7 +82,7 @@ public class ProcessorStateManagerTest {
private final MockChangelogReader changelogReader = new
MockChangelogReader();
private final MockKeyValueStore mockKeyValueStore = new
MockKeyValueStore(storeName, true);
private final byte[] key = new byte[]{0x0, 0x0, 0x0, 0x1};
- private final byte[] value =
"the-value".getBytes(Charset.forName("UTF-8"));
+ private final byte[] value = "the-value".getBytes(StandardCharsets.UTF_8);
private final ConsumerRecord<byte[], byte[]> consumerRecord = new
ConsumerRecord<>(changelogTopic, 0, 0, key, value);
private final LogContext logContext = new
LogContext("process-state-manager-test ");
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
index dc22bb4..3fa4b1e 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
@@ -35,8 +35,13 @@ public class StateRestorerTest {
private final MockRestoreCallback callback = new MockRestoreCallback();
private final MockStateRestoreListener reportingListener = new
MockStateRestoreListener();
private final CompositeRestoreListener compositeRestoreListener = new
CompositeRestoreListener(callback);
- private final StateRestorer restorer = new StateRestorer(new
TopicPartition("topic", 1), compositeRestoreListener,
- null,
OFFSET_LIMIT, true, "storeName");
+ private final StateRestorer restorer = new StateRestorer(
+ new TopicPartition("topic", 1),
+ compositeRestoreListener,
+ null,
+ OFFSET_LIMIT,
+ true,
+ "storeName");
@Before
public void setUp() {
@@ -66,10 +71,13 @@ public class StateRestorerTest {
@Test
public void shouldBeCompletedIfOffsetAndOffsetLimitAreZero() {
- final StateRestorer
- restorer =
- new StateRestorer(new TopicPartition("topic", 1),
compositeRestoreListener, null, 0, true,
- "storeName");
+ final StateRestorer restorer = new StateRestorer(
+ new TopicPartition("topic", 1),
+ compositeRestoreListener,
+ null,
+ 0,
+ true,
+ "storeName");
assertTrue(restorer.hasCompleted(0, 10));
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index d08f0d7..ebe50c0 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -73,7 +73,11 @@ public class StoreChangelogReaderTest {
private final StateRestoreListener stateRestoreListener = new
MockStateRestoreListener();
private final TopicPartition topicPartition = new TopicPartition("topic",
0);
private final LogContext logContext = new LogContext("test-reader ");
- private final StoreChangelogReader changelogReader = new
StoreChangelogReader(consumer, Duration.ZERO, stateRestoreListener, logContext);
+ private final StoreChangelogReader changelogReader = new
StoreChangelogReader(
+ consumer,
+ Duration.ZERO,
+ stateRestoreListener,
+ logContext);
@Before
public void setUp() {
@@ -91,8 +95,18 @@ public class StoreChangelogReaderTest {
}
};
- final StoreChangelogReader changelogReader = new
StoreChangelogReader(consumer, Duration.ZERO, stateRestoreListener, logContext);
- changelogReader.register(new StateRestorer(topicPartition,
restoreListener, null, Long.MAX_VALUE, true, "storeName"));
+ final StoreChangelogReader changelogReader = new StoreChangelogReader(
+ consumer,
+ Duration.ZERO,
+ stateRestoreListener,
+ logContext);
+ changelogReader.register(new StateRestorer(
+ topicPartition,
+ restoreListener,
+ null,
+ Long.MAX_VALUE,
+ true,
+ "storeName"));
changelogReader.restore(active);
assertTrue(functionCalled.get());
}
@@ -123,7 +137,13 @@ public class StoreChangelogReaderTest {
public void shouldRestoreAllMessagesFromBeginningWhenCheckpointNull() {
final int messages = 10;
setupConsumer(messages, topicPartition);
- changelogReader.register(new StateRestorer(topicPartition,
restoreListener, null, Long.MAX_VALUE, true, "storeName"));
+ changelogReader.register(new StateRestorer(
+ topicPartition,
+ restoreListener,
+ null,
+ Long.MAX_VALUE,
+ true,
+ "storeName"));
expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
replay(active, task);
changelogReader.restore(active);
@@ -141,8 +161,13 @@ public class StoreChangelogReaderTest {
return Collections.singleton(topicPartition);
}
});
- changelogReader.register(new StateRestorer(topicPartition,
restoreListener, null, Long.MAX_VALUE, true,
- "storeName"));
+ changelogReader.register(new StateRestorer(
+ topicPartition,
+ restoreListener,
+ null,
+ Long.MAX_VALUE,
+ true,
+ "storeName"));
EasyMock.expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
EasyMock.replay(active, task);
@@ -150,8 +175,13 @@ public class StoreChangelogReaderTest {
// first restore call "fails" but we should not die with an exception
assertEquals(0, changelogReader.restore(active).size());
- changelogReader.register(new StateRestorer(topicPartition,
restoreListener, null, Long.MAX_VALUE, true,
- "storeName"));
+ changelogReader.register(new StateRestorer(
+ topicPartition,
+ restoreListener,
+ null,
+ Long.MAX_VALUE,
+ true,
+ "storeName"));
// retry restore should succeed
assertEquals(1, changelogReader.restore(active).size());
assertThat(callback.restored.size(), equalTo(messages));
@@ -167,7 +197,7 @@ public class StoreChangelogReaderTest {
consumer.updateEndOffsets(Collections.singletonMap(topicPartition,
(long) (messages + startOffset)));
addRecords(messages, topicPartition, startOffset);
- consumer.assign(Collections.<TopicPartition>emptyList());
+ consumer.assign(Collections.emptyList());
final StateRestorer stateRestorer = new StateRestorer(
topicPartition,
@@ -198,8 +228,13 @@ public class StoreChangelogReaderTest {
public void shouldRestoreMessagesFromCheckpoint() {
final int messages = 10;
setupConsumer(messages, topicPartition);
- changelogReader.register(new StateRestorer(topicPartition,
restoreListener, 5L, Long.MAX_VALUE, true,
- "storeName"));
+ changelogReader.register(new StateRestorer(
+ topicPartition,
+ restoreListener,
+ 5L,
+ Long.MAX_VALUE,
+ true,
+ "storeName"));
changelogReader.restore(active);
assertThat(callback.restored.size(), equalTo(5));
@@ -209,8 +244,13 @@ public class StoreChangelogReaderTest {
public void shouldClearAssignmentAtEndOfRestore() {
final int messages = 1;
setupConsumer(messages, topicPartition);
- changelogReader.register(new StateRestorer(topicPartition,
restoreListener, null, Long.MAX_VALUE, true,
- "storeName"));
+ changelogReader.register(new StateRestorer(
+ topicPartition,
+ restoreListener,
+ null,
+ Long.MAX_VALUE,
+ true,
+ "storeName"));
expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
replay(active, task);
changelogReader.restore(active);
@@ -220,8 +260,13 @@ public class StoreChangelogReaderTest {
@Test
public void shouldRestoreToLimitWhenSupplied() {
setupConsumer(10, topicPartition);
- final StateRestorer restorer = new StateRestorer(topicPartition,
restoreListener, null, 3, true,
- "storeName");
+ final StateRestorer restorer = new StateRestorer(
+ topicPartition,
+ restoreListener,
+ null,
+ 3,
+ true,
+ "storeName");
changelogReader.register(restorer);
expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
replay(active, task);
@@ -242,9 +287,27 @@ public class StoreChangelogReaderTest {
setupConsumer(5, one);
setupConsumer(3, two);
- changelogReader.register(new StateRestorer(topicPartition,
restoreListener, null, Long.MAX_VALUE, true, "storeName1"));
- changelogReader.register(new StateRestorer(one, restoreListener1,
null, Long.MAX_VALUE, true, "storeName2"));
- changelogReader.register(new StateRestorer(two, restoreListener2,
null, Long.MAX_VALUE, true, "storeName3"));
+ changelogReader.register(new StateRestorer(
+ topicPartition,
+ restoreListener,
+ null,
+ Long.MAX_VALUE,
+ true,
+ "storeName1"));
+ changelogReader.register(new StateRestorer(
+ one,
+ restoreListener1,
+ null,
+ Long.MAX_VALUE,
+ true,
+ "storeName2"));
+ changelogReader.register(new StateRestorer(
+ two,
+ restoreListener2,
+ null,
+ Long.MAX_VALUE,
+ true,
+ "storeName3"));
expect(active.restoringTaskFor(one)).andStubReturn(task);
expect(active.restoringTaskFor(two)).andStubReturn(task);
@@ -258,7 +321,7 @@ public class StoreChangelogReaderTest {
}
@Test
- public void shouldRestoreAndNotifyMultipleStores() throws Exception {
+ public void shouldRestoreAndNotifyMultipleStores() {
final TopicPartition one = new TopicPartition("one", 0);
final TopicPartition two = new TopicPartition("two", 0);
final MockStateRestoreListener callbackOne = new
MockStateRestoreListener();
@@ -269,10 +332,27 @@ public class StoreChangelogReaderTest {
setupConsumer(5, one);
setupConsumer(3, two);
- changelogReader
- .register(new StateRestorer(topicPartition, restoreListener, 0L,
Long.MAX_VALUE, true, "storeName1"));
- changelogReader.register(new StateRestorer(one, restoreListener1, 0L,
Long.MAX_VALUE, true, "storeName2"));
- changelogReader.register(new StateRestorer(two, restoreListener2, 0L,
Long.MAX_VALUE, true, "storeName3"));
+ changelogReader.register(new StateRestorer(
+ topicPartition,
+ restoreListener,
+ 0L,
+ Long.MAX_VALUE,
+ true,
+ "storeName1"));
+ changelogReader.register(new StateRestorer(
+ one,
+ restoreListener1,
+ 0L,
+ Long.MAX_VALUE,
+ true,
+ "storeName2"));
+ changelogReader.register(new StateRestorer(
+ two,
+ restoreListener2,
+ 0L,
+ Long.MAX_VALUE,
+ true,
+ "storeName3"));
expect(active.restoringTaskFor(one)).andReturn(task);
expect(active.restoringTaskFor(two)).andReturn(task);
@@ -300,8 +380,13 @@ public class StoreChangelogReaderTest {
@Test
public void shouldOnlyReportTheLastRestoredOffset() {
setupConsumer(10, topicPartition);
- changelogReader
- .register(new StateRestorer(topicPartition, restoreListener, 0L,
5, true, "storeName1"));
+ changelogReader.register(new StateRestorer(
+ topicPartition,
+ restoreListener,
+ 0L,
+ 5,
+ true,
+ "storeName1"));
expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
replay(active, task);
changelogReader.restore(active);
@@ -311,7 +396,6 @@ public class StoreChangelogReaderTest {
assertCorrectOffsetsReportedByListener(callback, 0L, 4L, 5L);
}
-
private void assertAllCallbackStatesExecuted(final
MockStateRestoreListener restoreListener,
final String storeName) {
assertThat(restoreListener.storeNameCalledStates.get(RESTORE_START),
equalTo(storeName));
@@ -319,7 +403,6 @@ public class StoreChangelogReaderTest {
assertThat(restoreListener.storeNameCalledStates.get(RESTORE_END),
equalTo(storeName));
}
-
private void assertCorrectOffsetsReportedByListener(final
MockStateRestoreListener restoreListener,
final long startOffset,
final long batchOffset,
@@ -332,9 +415,13 @@ public class StoreChangelogReaderTest {
@Test
public void shouldNotRestoreAnythingWhenPartitionIsEmpty() {
- final StateRestorer
- restorer =
- new StateRestorer(topicPartition, restoreListener, null,
Long.MAX_VALUE, true, "storeName");
+ final StateRestorer restorer = new StateRestorer(
+ topicPartition,
+ restoreListener,
+ null,
+ Long.MAX_VALUE,
+ true,
+ "storeName");
setupConsumer(0, topicPartition);
changelogReader.register(restorer);
@@ -345,11 +432,15 @@ public class StoreChangelogReaderTest {
@Test
public void shouldNotRestoreAnythingWhenCheckpointAtEndOffset() {
- final Long endOffset = 10L;
+ final long endOffset = 10L;
setupConsumer(endOffset, topicPartition);
- final StateRestorer
- restorer =
- new StateRestorer(topicPartition, restoreListener, endOffset,
Long.MAX_VALUE, true, "storeName");
+ final StateRestorer restorer = new StateRestorer(
+ topicPartition,
+ restoreListener,
+ endOffset,
+ Long.MAX_VALUE,
+ true,
+ "storeName");
changelogReader.register(restorer);
@@ -361,7 +452,13 @@ public class StoreChangelogReaderTest {
@Test
public void shouldReturnRestoredOffsetsForPersistentStores() {
setupConsumer(10, topicPartition);
- changelogReader.register(new StateRestorer(topicPartition,
restoreListener, null, Long.MAX_VALUE, true, "storeName"));
+ changelogReader.register(new StateRestorer(
+ topicPartition,
+ restoreListener,
+ null,
+ Long.MAX_VALUE,
+ true,
+ "storeName"));
expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
replay(active, task);
changelogReader.restore(active);
@@ -373,12 +470,18 @@ public class StoreChangelogReaderTest {
@Test
public void shouldNotReturnRestoredOffsetsForNonPersistentStore() {
setupConsumer(10, topicPartition);
- changelogReader.register(new StateRestorer(topicPartition,
restoreListener, null, Long.MAX_VALUE, false, "storeName"));
+ changelogReader.register(new StateRestorer(
+ topicPartition,
+ restoreListener,
+ null,
+ Long.MAX_VALUE,
+ false,
+ "storeName"));
expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
replay(active, task);
changelogReader.restore(active);
final Map<TopicPartition, Long> restoredOffsets =
changelogReader.restoredOffsets();
- assertThat(restoredOffsets, equalTo(Collections.<TopicPartition,
Long>emptyMap()));
+ assertThat(restoredOffsets, equalTo(Collections.emptyMap()));
}
@Test
@@ -386,11 +489,16 @@ public class StoreChangelogReaderTest {
assignPartition(3, topicPartition);
final byte[] bytes = new byte[0];
consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(),
topicPartition.partition(), 0, bytes, bytes));
- consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(),
topicPartition.partition(), 1, (byte[]) null, bytes));
+ consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(),
topicPartition.partition(), 1, null, bytes));
consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(),
topicPartition.partition(), 2, bytes, bytes));
consumer.assign(Collections.singletonList(topicPartition));
- changelogReader.register(new StateRestorer(topicPartition,
restoreListener, null, Long.MAX_VALUE, false,
- "storeName"));
+ changelogReader.register(new StateRestorer(
+ topicPartition,
+ restoreListener,
+ null,
+ Long.MAX_VALUE,
+ false,
+ "storeName"));
expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
replay(active, task);
changelogReader.restore(active);
@@ -402,7 +510,13 @@ public class StoreChangelogReaderTest {
public void shouldCompleteImmediatelyWhenEndOffsetIs0() {
final Collection<TopicPartition> expected =
Collections.singleton(topicPartition);
setupConsumer(0, topicPartition);
- changelogReader.register(new StateRestorer(topicPartition,
restoreListener, null, Long.MAX_VALUE, true, "store"));
+ changelogReader.register(new StateRestorer(
+ topicPartition,
+ restoreListener,
+ null,
+ Long.MAX_VALUE,
+ true,
+ "store"));
expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
replay(active, task);
@@ -417,7 +531,13 @@ public class StoreChangelogReaderTest {
setupConsumer(1, topicPartition);
consumer.updateEndOffsets(Collections.singletonMap(topicPartition,
10L));
- changelogReader.register(new StateRestorer(topicPartition,
restoreListener, null, Long.MAX_VALUE, false, "storeName"));
+ changelogReader.register(new StateRestorer(
+ topicPartition,
+ restoreListener,
+ null,
+ Long.MAX_VALUE,
+ false,
+ "storeName"));
final TopicPartition postInitialization = new TopicPartition("other",
0);
expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
@@ -432,7 +552,13 @@ public class StoreChangelogReaderTest {
consumer.updateBeginningOffsets(Collections.singletonMap(postInitialization,
0L));
consumer.updateEndOffsets(Collections.singletonMap(postInitialization,
3L));
- changelogReader.register(new StateRestorer(postInitialization,
restoreListener2, null, Long.MAX_VALUE, false, "otherStore"));
+ changelogReader.register(new StateRestorer(
+ postInitialization,
+ restoreListener2,
+ null,
+ Long.MAX_VALUE,
+ false,
+ "otherStore"));
final Collection<TopicPartition> expected =
Utils.mkSet(topicPartition, postInitialization);
consumer.assign(expected);
@@ -442,7 +568,6 @@ public class StoreChangelogReaderTest {
assertThat(callbackTwo.restored.size(), equalTo(3));
}
-
@Test
public void
shouldNotThrowTaskMigratedExceptionIfSourceTopicUpdatedDuringRestoreProcess() {
final int messages = 10;
@@ -450,7 +575,13 @@ public class StoreChangelogReaderTest {
// in this case first call to endOffsets returns correct value, but a
second thread has updated the source topic
// but since it's a source topic, the second check should not fire
hence no exception
consumer.addEndOffsets(Collections.singletonMap(topicPartition, 15L));
- changelogReader.register(new StateRestorer(topicPartition,
restoreListener, null, 9L, true, "storeName"));
+ changelogReader.register(new StateRestorer(
+ topicPartition,
+ restoreListener,
+ null,
+ 9L,
+ true,
+ "storeName"));
expect(active.restoringTaskFor(topicPartition)).andReturn(task);
replay(active);
@@ -458,7 +589,6 @@ public class StoreChangelogReaderTest {
changelogReader.restore(active);
}
-
@Test
public void
shouldNotThrowTaskMigratedExceptionDuringRestoreForChangelogTopicWhenEndOffsetNotExceededEOSEnabled()
{
final int totalMessages = 10;
@@ -466,7 +596,13 @@ public class StoreChangelogReaderTest {
// records have offsets of 0..9 10 is commit marker so 11 is end offset
consumer.updateEndOffsets(Collections.singletonMap(topicPartition,
11L));
- changelogReader.register(new StateRestorer(topicPartition,
restoreListener, null, Long.MAX_VALUE, true, "storeName"));
+ changelogReader.register(new StateRestorer(
+ topicPartition,
+ restoreListener,
+ null,
+ Long.MAX_VALUE,
+ true,
+ "storeName"));
expect(active.restoringTaskFor(topicPartition)).andReturn(task);
replay(active);
@@ -475,13 +611,18 @@ public class StoreChangelogReaderTest {
assertThat(callback.restored.size(), equalTo(10));
}
-
@Test
public void
shouldNotThrowTaskMigratedExceptionDuringRestoreForChangelogTopicWhenEndOffsetNotExceededEOSDisabled()
{
final int totalMessages = 10;
setupConsumer(totalMessages, topicPartition);
- changelogReader.register(new StateRestorer(topicPartition,
restoreListener, null, Long.MAX_VALUE, true, "storeName"));
+ changelogReader.register(new StateRestorer(
+ topicPartition,
+ restoreListener,
+ null,
+ Long.MAX_VALUE,
+ true,
+ "storeName"));
expect(active.restoringTaskFor(topicPartition)).andReturn(task);
replay(active);
@@ -494,7 +635,13 @@ public class StoreChangelogReaderTest {
public void
shouldNotThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestoreForSourceTopic()
{
final int messages = 10;
setupConsumer(messages, topicPartition);
- changelogReader.register(new StateRestorer(topicPartition,
restoreListener, null, 5, true, "storeName"));
+ changelogReader.register(new StateRestorer(
+ topicPartition,
+ restoreListener,
+ null,
+ 5,
+ true,
+ "storeName"));
expect(active.restoringTaskFor(topicPartition)).andReturn(task);
replay(active);
@@ -508,7 +655,13 @@ public class StoreChangelogReaderTest {
final int messages = 10;
setupConsumer(messages, topicPartition);
- changelogReader.register(new StateRestorer(topicPartition,
restoreListener, null, 10, true, "storeName"));
+ changelogReader.register(new StateRestorer(
+ topicPartition,
+ restoreListener,
+ null,
+ 10,
+ true,
+ "storeName"));
expect(active.restoringTaskFor(topicPartition)).andReturn(task);
replay(active);
@@ -525,11 +678,17 @@ public class StoreChangelogReaderTest {
addRecords(5, topicPartition, 0);
//EOS enabled so commit marker at offset 5 so records start at 6
addRecords(5, topicPartition, 6);
- consumer.assign(Collections.<TopicPartition>emptyList());
+ consumer.assign(Collections.emptyList());
// commit marker is 5 so ending offset is 12
consumer.updateEndOffsets(Collections.singletonMap(topicPartition,
12L));
- changelogReader.register(new StateRestorer(topicPartition,
restoreListener, null, 6, true, "storeName"));
+ changelogReader.register(new StateRestorer(
+ topicPartition,
+ restoreListener,
+ null,
+ 6,
+ true,
+ "storeName"));
expect(active.restoringTaskFor(topicPartition)).andReturn(task);
replay(active);
@@ -545,7 +704,13 @@ public class StoreChangelogReaderTest {
// records have offsets 0..9 10 is commit marker so 11 is ending offset
consumer.updateEndOffsets(Collections.singletonMap(topicPartition,
11L));
- changelogReader.register(new StateRestorer(topicPartition,
restoreListener, null, 11, true, "storeName"));
+ changelogReader.register(new StateRestorer(
+ topicPartition,
+ restoreListener,
+ null,
+ 11,
+ true,
+ "storeName"));
expect(active.restoringTaskFor(topicPartition)).andReturn(task);
replay(active);
@@ -558,26 +723,32 @@ public class StoreChangelogReaderTest {
final TopicPartition topicPartition) {
assignPartition(messages, topicPartition);
addRecords(messages, topicPartition, 0);
- consumer.assign(Collections.<TopicPartition>emptyList());
+ consumer.assign(Collections.emptyList());
}
private void addRecords(final long messages,
final TopicPartition topicPartition,
final int startingOffset) {
for (int i = 0; i < messages; i++) {
- consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(),
topicPartition.partition(), startingOffset + i, new byte[0], new byte[0]));
+ consumer.addRecord(new ConsumerRecord<>(
+ topicPartition.topic(),
+ topicPartition.partition(),
+ startingOffset + i,
+ new byte[0],
+ new byte[0]));
}
}
private void assignPartition(final long messages,
final TopicPartition topicPartition) {
- consumer.updatePartitions(topicPartition.topic(),
- Collections.singletonList(
- new PartitionInfo(topicPartition.topic(),
-
topicPartition.partition(),
- null,
- null,
- null)));
+ consumer.updatePartitions(
+ topicPartition.topic(),
+ Collections.singletonList(new PartitionInfo(
+ topicPartition.topic(),
+ topicPartition.partition(),
+ null,
+ null,
+ null)));
consumer.updateBeginningOffsets(Collections.singletonMap(topicPartition, 0L));
consumer.updateEndOffsets(Collections.singletonMap(topicPartition,
Math.max(0, messages)));
consumer.assign(Collections.singletonList(topicPartition));
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index dd311fb..d499b7c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals;
-import java.time.Duration;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@@ -34,7 +33,6 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Measurable;
-import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Serdes;
@@ -55,7 +53,6 @@ import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.PunctuationType;
-import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TaskMetadata;
import org.apache.kafka.streams.processor.ThreadMetadata;
@@ -67,7 +64,6 @@ import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.StreamsTestUtils;
-import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.junit.Assert;
@@ -75,7 +71,7 @@ import org.junit.Before;
import org.junit.Test;
import java.io.File;
-import java.io.IOException;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -176,7 +172,7 @@ public class StreamThreadTest {
// assign single partition
assignedPartitions = singletonList(t1p1);
- thread.taskManager().setAssignmentMetadata(Collections.<TaskId,
Set<TopicPartition>>emptyMap(), Collections.<TaskId,
Set<TopicPartition>>emptyMap());
+ thread.taskManager().setAssignmentMetadata(Collections.emptyMap(),
Collections.emptyMap());
final MockConsumer<byte[], byte[]> mockConsumer =
(MockConsumer<byte[], byte[]>) thread.consumer;
mockConsumer.assign(assignedPartitions);
@@ -192,27 +188,23 @@ public class StreamThreadTest {
}
@Test
- public void testStateChangeStartClose() throws InterruptedException {
+ public void testStateChangeStartClose() throws Exception {
final StreamThread thread = createStreamThread(clientId, config,
false);
final StateListenerStub stateListener = new StateListenerStub();
thread.setStateListener(stateListener);
thread.start();
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return thread.state() == StreamThread.State.STARTING;
- }
- }, 10 * 1000, "Thread never started.");
+ TestUtils.waitForCondition(
+ () -> thread.state() == StreamThread.State.STARTING,
+ 10 * 1000,
+ "Thread never started.");
thread.shutdown();
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return thread.state() == StreamThread.State.DEAD;
- }
- }, 10 * 1000, "Thread never shut down.");
+ TestUtils.waitForCondition(
+ () -> thread.state() == StreamThread.State.DEAD,
+ 10 * 1000,
+ "Thread never shut down.");
thread.shutdown();
assertEquals(thread.state(), StreamThread.State.DEAD);
@@ -223,9 +215,9 @@ public class StreamThreadTest {
return new Cluster(
"mockClusterId",
singletonList(node),
- Collections.<PartitionInfo>emptySet(),
- Collections.<String>emptySet(),
- Collections.<String>emptySet(),
+ Collections.emptySet(),
+ Collections.emptySet(),
+ Collections.emptySet(),
node
);
}
@@ -289,8 +281,6 @@ public class StreamThreadTest {
defaultGroupName, thread.getName())));
}
-
- @SuppressWarnings({"unchecked", "ThrowableNotThrown"})
@Test
public void shouldNotCommitBeforeTheCommitInterval() {
final long commitInterval = 1000L;
@@ -302,7 +292,8 @@ public class StreamThreadTest {
final Consumer<byte[], byte[]> consumer =
EasyMock.createNiceMock(Consumer.class);
final TaskManager taskManager = mockTaskManagerCommit(consumer, 1, 1);
- final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new
StreamThread.StreamsMetricsThreadImpl(metrics, "");
+ final StreamThread.StreamsMetricsThreadImpl streamsMetrics
+ = new StreamThread.StreamsMetricsThreadImpl(metrics, "");
final StreamThread thread = new StreamThread(
mockTime,
config,
@@ -350,7 +341,7 @@ public class StreamThreadTest {
Collections.singletonMap(
new TaskId(0, t1p1.partition()),
assignedPartitions),
- Collections.<TaskId, Set<TopicPartition>>emptyMap());
+ Collections.emptyMap());
final MockConsumer<byte[], byte[]> mockConsumer =
(MockConsumer<byte[], byte[]>) thread.consumer;
mockConsumer.assign(Collections.singleton(t1p1));
@@ -360,13 +351,13 @@ public class StreamThreadTest {
// processed one record, punctuated after the first record, and hence
num.iterations is still 1
long offset = -1;
- mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(),
t1p1.partition(), ++offset, 0, TimestampType.CREATE_TIME, -1, -1, -1, new
byte[0], new byte[0]));
+ addRecord(mockConsumer, ++offset, 0L);
thread.runOnce();
assertThat(thread.currentNumIterations(), equalTo(1));
// processed one more record without punctuation, and bump
num.iterations to 2
- mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(),
t1p1.partition(), ++offset, 1, TimestampType.CREATE_TIME, -1, -1, -1, new
byte[0], new byte[0]));
+ addRecord(mockConsumer, ++offset, 1L);
thread.runOnce();
assertThat(thread.currentNumIterations(), equalTo(2));
@@ -382,28 +373,28 @@ public class StreamThreadTest {
assertThat(thread.currentNumIterations(), equalTo(1));
// processed two records, bumping up iterations to 2
- mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(),
t1p1.partition(), ++offset, 5, TimestampType.CREATE_TIME, -1, -1, -1, new
byte[0], new byte[0]));
- mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(),
t1p1.partition(), ++offset, 6, TimestampType.CREATE_TIME, -1, -1, -1, new
byte[0], new byte[0]));
+ addRecord(mockConsumer, ++offset, 5L);
+ addRecord(mockConsumer, ++offset, 6L);
thread.runOnce();
assertThat(thread.currentNumIterations(), equalTo(2));
// stream time based punctutation halves to 1
- mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(),
t1p1.partition(), ++offset, 11, TimestampType.CREATE_TIME, -1, -1, -1, new
byte[0], new byte[0]));
+ addRecord(mockConsumer, ++offset, 11L);
thread.runOnce();
assertThat(thread.currentNumIterations(), equalTo(1));
// processed three records, bumping up iterations to 3 (1 + 2)
- mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(),
t1p1.partition(), ++offset, 12, TimestampType.CREATE_TIME, -1, -1, -1, new
byte[0], new byte[0]));
- mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(),
t1p1.partition(), ++offset, 13, TimestampType.CREATE_TIME, -1, -1, -1, new
byte[0], new byte[0]));
- mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(),
t1p1.partition(), ++offset, 14, TimestampType.CREATE_TIME, -1, -1, -1, new
byte[0], new byte[0]));
+ addRecord(mockConsumer, ++offset, 12L);
+ addRecord(mockConsumer, ++offset, 13L);
+ addRecord(mockConsumer, ++offset, 14L);
thread.runOnce();
assertThat(thread.currentNumIterations(), equalTo(3));
mockProcessor.requestCommit();
- mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(),
t1p1.partition(), ++offset, 15, TimestampType.CREATE_TIME, -1, -1, -1, new
byte[0], new byte[0]));
+ addRecord(mockConsumer, ++offset, 15L);
thread.runOnce();
// user requested commit should not impact on iteration adjustment
@@ -417,7 +408,6 @@ public class StreamThreadTest {
}
- @SuppressWarnings({"unchecked", "ThrowableNotThrown"})
@Test
public void shouldNotCauseExceptionIfNothingCommitted() {
final long commitInterval = 1000L;
@@ -429,7 +419,8 @@ public class StreamThreadTest {
final Consumer<byte[], byte[]> consumer =
EasyMock.createNiceMock(Consumer.class);
final TaskManager taskManager = mockTaskManagerCommit(consumer, 1, 0);
- final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new
StreamThread.StreamsMetricsThreadImpl(metrics, "");
+ final StreamThread.StreamsMetricsThreadImpl streamsMetrics
+ = new StreamThread.StreamsMetricsThreadImpl(metrics, "");
final StreamThread thread = new StreamThread(
mockTime,
config,
@@ -453,8 +444,6 @@ public class StreamThreadTest {
EasyMock.verify(taskManager);
}
-
- @SuppressWarnings("unchecked")
@Test
public void shouldCommitAfterTheCommitInterval() {
final long commitInterval = 1000L;
@@ -466,7 +455,8 @@ public class StreamThreadTest {
final Consumer<byte[], byte[]> consumer =
EasyMock.createNiceMock(Consumer.class);
final TaskManager taskManager = mockTaskManagerCommit(consumer, 2, 1);
- final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new
StreamThread.StreamsMetricsThreadImpl(metrics, "");
+ final StreamThread.StreamsMetricsThreadImpl streamsMetrics
+ = new StreamThread.StreamsMetricsThreadImpl(metrics, "");
final StreamThread thread = new StreamThread(
mockTime,
config,
@@ -490,8 +480,9 @@ public class StreamThreadTest {
EasyMock.verify(taskManager);
}
- @SuppressWarnings({"ThrowableNotThrown", "unchecked"})
- private TaskManager mockTaskManagerCommit(final Consumer<byte[], byte[]>
consumer, final int numberOfCommits, final int commits) {
+ private TaskManager mockTaskManagerCommit(final Consumer<byte[], byte[]>
consumer,
+ final int numberOfCommits,
+ final int commits) {
final TaskManager taskManager =
EasyMock.createNiceMock(TaskManager.class);
EasyMock.expect(taskManager.commitAll()).andReturn(commits).times(numberOfCommits);
EasyMock.replay(taskManager, consumer);
@@ -517,7 +508,7 @@ public class StreamThreadTest {
activeTasks.put(task1, Collections.singleton(t1p1));
activeTasks.put(task2, Collections.singleton(t1p2));
- thread.taskManager().setAssignmentMetadata(activeTasks,
Collections.<TaskId, Set<TopicPartition>>emptyMap());
+ thread.taskManager().setAssignmentMetadata(activeTasks,
Collections.emptyMap());
final MockConsumer<byte[], byte[]> mockConsumer =
(MockConsumer<byte[], byte[]>) thread.consumer;
mockConsumer.assign(assignedPartitions);
@@ -543,7 +534,7 @@ public class StreamThreadTest {
final StreamThread thread = createStreamThread(clientId, new
StreamsConfig(configProps(true)), true);
thread.setState(StreamThread.State.STARTING);
-
thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
+ thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
final List<TopicPartition> assignedPartitions = new ArrayList<>();
@@ -554,7 +545,7 @@ public class StreamThreadTest {
activeTasks.put(task1, Collections.singleton(t1p1));
activeTasks.put(task2, Collections.singleton(t1p2));
- thread.taskManager().setAssignmentMetadata(activeTasks,
Collections.<TaskId, Set<TopicPartition>>emptyMap());
+ thread.taskManager().setAssignmentMetadata(activeTasks,
Collections.emptyMap());
final MockConsumer<byte[], byte[]> mockConsumer =
(MockConsumer<byte[], byte[]>) thread.consumer;
mockConsumer.assign(assignedPartitions);
@@ -578,7 +569,7 @@ public class StreamThreadTest {
final StreamThread thread = createStreamThread(clientId, new
StreamsConfig(configProps(true)), true);
thread.setState(StreamThread.State.STARTING);
-
thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
+ thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
final List<TopicPartition> assignedPartitions = new ArrayList<>();
@@ -589,7 +580,7 @@ public class StreamThreadTest {
activeTasks.put(task1, Collections.singleton(t1p1));
activeTasks.put(task2, Collections.singleton(t1p2));
- thread.taskManager().setAssignmentMetadata(activeTasks,
Collections.<TaskId, Set<TopicPartition>>emptyMap());
+ thread.taskManager().setAssignmentMetadata(activeTasks,
Collections.emptyMap());
final MockConsumer<byte[], byte[]> mockConsumer =
(MockConsumer<byte[], byte[]>) thread.consumer;
mockConsumer.assign(assignedPartitions);
final Map<TopicPartition, Long> beginOffsets = new HashMap<>();
@@ -607,7 +598,6 @@ public class StreamThreadTest {
}
}
- @SuppressWarnings("unchecked")
@Test
public void shouldShutdownTaskManagerOnClose() {
final Consumer<byte[], byte[]> consumer =
EasyMock.createNiceMock(Consumer.class);
@@ -616,7 +606,8 @@ public class StreamThreadTest {
EasyMock.expectLastCall();
EasyMock.replay(taskManager, consumer);
- final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new
StreamThread.StreamsMetricsThreadImpl(metrics, "");
+ final StreamThread.StreamsMetricsThreadImpl streamsMetrics
+ = new StreamThread.StreamsMetricsThreadImpl(metrics, "");
final StreamThread thread = new StreamThread(
mockTime,
config,
@@ -632,19 +623,15 @@ public class StreamThreadTest {
new AtomicInteger()
);
thread.setStateListener(
- new StreamThread.StateListener() {
- @Override
- public void onChange(final Thread t, final
ThreadStateTransitionValidator newState, final ThreadStateTransitionValidator
oldState) {
- if (oldState == StreamThread.State.CREATED && newState ==
StreamThread.State.STARTING) {
- thread.shutdown();
- }
+ (t, newState, oldState) -> {
+ if (oldState == StreamThread.State.CREATED && newState ==
StreamThread.State.STARTING) {
+ thread.shutdown();
}
});
thread.run();
EasyMock.verify(taskManager);
}
- @SuppressWarnings("unchecked")
@Test
public void shouldShutdownTaskManagerOnCloseWithoutStart() {
final Consumer<byte[], byte[]> consumer =
EasyMock.createNiceMock(Consumer.class);
@@ -653,7 +640,8 @@ public class StreamThreadTest {
EasyMock.expectLastCall();
EasyMock.replay(taskManager, consumer);
- final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new
StreamThread.StreamsMetricsThreadImpl(metrics, "");
+ final StreamThread.StreamsMetricsThreadImpl streamsMetrics
+ = new StreamThread.StreamsMetricsThreadImpl(metrics, "");
final StreamThread thread = new StreamThread(
mockTime,
config,
@@ -672,7 +660,6 @@ public class StreamThreadTest {
EasyMock.verify(taskManager);
}
- @SuppressWarnings("unchecked")
@Test
public void shouldOnlyShutdownOnce() {
final Consumer<byte[], byte[]> consumer =
EasyMock.createNiceMock(Consumer.class);
@@ -681,7 +668,8 @@ public class StreamThreadTest {
EasyMock.expectLastCall();
EasyMock.replay(taskManager, consumer);
- final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new
StreamThread.StreamsMetricsThreadImpl(metrics, "");
+ final StreamThread.StreamsMetricsThreadImpl streamsMetrics
+ = new StreamThread.StreamsMetricsThreadImpl(metrics, "");
final StreamThread thread = new StreamThread(
mockTime,
config,
@@ -717,10 +705,10 @@ public class StreamThreadTest {
// assign single partition
standbyTasks.put(task1, Collections.singleton(t1p1));
- thread.taskManager().setAssignmentMetadata(Collections.<TaskId,
Set<TopicPartition>>emptyMap(), standbyTasks);
-
thread.taskManager().createTasks(Collections.<TopicPartition>emptyList());
+ thread.taskManager().setAssignmentMetadata(Collections.emptyMap(),
standbyTasks);
+ thread.taskManager().createTasks(Collections.emptyList());
-
thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptyList());
+ thread.rebalanceListener.onPartitionsAssigned(Collections.emptyList());
}
@Test
@@ -744,7 +732,7 @@ public class StreamThreadTest {
assignedPartitions.add(t1p1);
activeTasks.put(task1, Collections.singleton(t1p1));
- thread.taskManager().setAssignmentMetadata(activeTasks,
Collections.<TaskId, Set<TopicPartition>>emptyMap());
+ thread.taskManager().setAssignmentMetadata(activeTasks,
Collections.emptyMap());
final MockConsumer<byte[], byte[]> mockConsumer =
(MockConsumer<byte[], byte[]>) thread.consumer;
mockConsumer.assign(assignedPartitions);
@@ -768,12 +756,7 @@ public class StreamThreadTest {
assertFalse(producer.transactionCommitted());
mockTime.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG)
+ 1L);
TestUtils.waitForCondition(
- new TestCondition() {
- @Override
- public boolean conditionMet() {
- return producer.commitCount() == 1;
- }
- },
+ () -> producer.commitCount() == 1,
"StreamsThread did not commit transaction.");
producer.fenceProducer();
@@ -784,12 +767,7 @@ public class StreamThreadTest {
fail("Should have thrown TaskMigratedException");
} catch (final TaskMigratedException expected) { /* ignore */ }
TestUtils.waitForCondition(
- new TestCondition() {
- @Override
- public boolean conditionMet() {
- return thread.tasks().isEmpty();
- }
- },
+ () -> thread.tasks().isEmpty(),
"StreamsThread did not remove fenced zombie task.");
assertThat(producer.commitCount(), equalTo(1L));
@@ -812,7 +790,7 @@ public class StreamThreadTest {
assignedPartitions.add(t1p1);
activeTasks.put(task1, Collections.singleton(t1p1));
- thread.taskManager().setAssignmentMetadata(activeTasks,
Collections.<TaskId, Set<TopicPartition>>emptyMap());
+ thread.taskManager().setAssignmentMetadata(activeTasks,
Collections.emptyMap());
final MockConsumer<byte[], byte[]> mockConsumer =
(MockConsumer<byte[], byte[]>) thread.consumer;
mockConsumer.assign(assignedPartitions);
@@ -848,7 +826,7 @@ public class StreamThreadTest {
assignedPartitions.add(t1p1);
activeTasks.put(task1, Collections.singleton(t1p1));
- thread.taskManager().setAssignmentMetadata(activeTasks,
Collections.<TaskId, Set<TopicPartition>>emptyMap());
+ thread.taskManager().setAssignmentMetadata(activeTasks,
Collections.emptyMap());
final MockConsumer<byte[], byte[]> mockConsumer =
(MockConsumer<byte[], byte[]>) thread.consumer;
mockConsumer.assign(assignedPartitions);
@@ -904,7 +882,7 @@ public class StreamThreadTest {
assignedPartitions.add(t1p1);
activeTasks.put(task1, Collections.singleton(t1p1));
- thread.taskManager().setAssignmentMetadata(activeTasks,
Collections.<TaskId, Set<TopicPartition>>emptyMap());
+ thread.taskManager().setAssignmentMetadata(activeTasks,
Collections.emptyMap());
final MockConsumer<byte[], byte[]> mockConsumer =
(MockConsumer<byte[], byte[]>) thread.consumer;
mockConsumer.assign(assignedPartitions);
@@ -922,7 +900,7 @@ public class StreamThreadTest {
@Test
public void shouldReturnStandbyTaskMetadataWhileRunningState() {
internalStreamsBuilder.stream(Collections.singleton(topic1), consumed)
- .groupByKey().count(Materialized.<Object, Long,
KeyValueStore<Bytes, byte[]>>as("count-one"));
+ .groupByKey().count(Materialized.as("count-one"));
internalStreamsBuilder.buildAndOptimizeTopology();
final StreamThread thread = createStreamThread(clientId, config,
false);
@@ -951,9 +929,9 @@ public class StreamThreadTest {
// assign single partition
standbyTasks.put(task1, Collections.singleton(t1p1));
- thread.taskManager().setAssignmentMetadata(Collections.<TaskId,
Set<TopicPartition>>emptyMap(), standbyTasks);
+ thread.taskManager().setAssignmentMetadata(Collections.emptyMap(),
standbyTasks);
-
thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptyList());
+ thread.rebalanceListener.onPartitionsAssigned(Collections.emptyList());
thread.runOnce();
@@ -965,17 +943,20 @@ public class StreamThreadTest {
@SuppressWarnings("unchecked")
@Test
- public void shouldUpdateStandbyTask() throws IOException {
+ public void shouldUpdateStandbyTask() throws Exception {
final String storeName1 = "count-one";
final String storeName2 = "table-two";
final String changelogName1 = applicationId + "-" + storeName1 +
"-changelog";
final String changelogName2 = applicationId + "-" + storeName2 +
"-changelog";
final TopicPartition partition1 = new TopicPartition(changelogName1,
1);
final TopicPartition partition2 = new TopicPartition(changelogName2,
1);
- internalStreamsBuilder.stream(Collections.singleton(topic1), consumed)
- .groupByKey().count(Materialized.<Object, Long,
KeyValueStore<Bytes, byte[]>>as(storeName1));
- final MaterializedInternal materialized = new
MaterializedInternal(Materialized.as(storeName2), internalStreamsBuilder, "");
- internalStreamsBuilder.table(topic2, new ConsumedInternal(),
materialized);
+ internalStreamsBuilder
+ .stream(Collections.singleton(topic1), consumed)
+ .groupByKey()
+ .count(Materialized.as(storeName1));
+ final MaterializedInternal<Object, Object, KeyValueStore<Bytes,
byte[]>> materialized
+ = new MaterializedInternal<>(Materialized.as(storeName2),
internalStreamsBuilder, "");
+ internalStreamsBuilder.table(topic2, new ConsumedInternal<>(),
materialized);
internalStreamsBuilder.buildAndOptimizeTopology();
final StreamThread thread = createStreamThread(clientId, config,
false);
@@ -998,12 +979,23 @@ public class StreamThreadTest {
restoreConsumer.updateEndOffsets(Collections.singletonMap(partition2,
10L));
restoreConsumer.updateBeginningOffsets(Collections.singletonMap(partition2,
0L));
// let the store1 be restored from 0 to 10; store2 be restored from 5
(checkpointed) to 10
- final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new
File(stateDirectory.directoryForTask(task3), CHECKPOINT_FILE_NAME));
+ final OffsetCheckpoint checkpoint
+ = new OffsetCheckpoint(new
File(stateDirectory.directoryForTask(task3), CHECKPOINT_FILE_NAME));
checkpoint.write(Collections.singletonMap(partition2, 5L));
for (long i = 0L; i < 10L; i++) {
- restoreConsumer.addRecord(new ConsumerRecord<>(changelogName1, 1,
i, ("K" + i).getBytes(), ("V" + i).getBytes()));
- restoreConsumer.addRecord(new ConsumerRecord<>(changelogName2, 1,
i, ("K" + i).getBytes(), ("V" + i).getBytes()));
+ restoreConsumer.addRecord(new ConsumerRecord<>(
+ changelogName1,
+ 1,
+ i,
+ ("K" + i).getBytes(),
+ ("V" + i).getBytes()));
+ restoreConsumer.addRecord(new ConsumerRecord<>(
+ changelogName2,
+ 1,
+ i,
+ ("K" + i).getBytes(),
+ ("V" + i).getBytes()));
}
thread.setState(StreamThread.State.STARTING);
@@ -1015,9 +1007,9 @@ public class StreamThreadTest {
standbyTasks.put(task1, Collections.singleton(t1p1));
standbyTasks.put(task3, Collections.singleton(t2p1));
- thread.taskManager().setAssignmentMetadata(Collections.<TaskId,
Set<TopicPartition>>emptyMap(), standbyTasks);
+ thread.taskManager().setAssignmentMetadata(Collections.emptyMap(),
standbyTasks);
-
thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptyList());
+ thread.rebalanceListener.onPartitionsAssigned(Collections.emptyList());
thread.runOnce();
@@ -1035,33 +1027,19 @@ public class StreamThreadTest {
public void shouldPunctuateActiveTask() {
final List<Long> punctuatedStreamTime = new ArrayList<>();
final List<Long> punctuatedWallClockTime = new ArrayList<>();
- final ProcessorSupplier<Object, Object> punctuateProcessor = new
ProcessorSupplier<Object, Object>() {
+ final ProcessorSupplier<Object, Object> punctuateProcessor = () -> new
Processor<Object, Object>() {
@Override
- public Processor<Object, Object> get() {
- return new Processor<Object, Object>() {
- @Override
- public void init(final ProcessorContext context) {
- context.schedule(Duration.ofMillis(100L),
PunctuationType.STREAM_TIME, new Punctuator() {
- @Override
- public void punctuate(final long timestamp) {
- punctuatedStreamTime.add(timestamp);
- }
- });
- context.schedule(Duration.ofMillis(100L),
PunctuationType.WALL_CLOCK_TIME, new Punctuator() {
- @Override
- public void punctuate(final long timestamp) {
- punctuatedWallClockTime.add(timestamp);
- }
- });
- }
-
- @Override
- public void process(final Object key, final Object value)
{}
-
- @Override
- public void close() {}
- };
+ public void init(final ProcessorContext context) {
+ context.schedule(Duration.ofMillis(100L),
PunctuationType.STREAM_TIME, punctuatedStreamTime::add);
+ context.schedule(Duration.ofMillis(100L),
PunctuationType.WALL_CLOCK_TIME, punctuatedWallClockTime::add);
}
+
+ @Override
+ public void process(final Object key,
+ final Object value) {}
+
+ @Override
+ public void close() {}
};
internalStreamsBuilder.stream(Collections.singleton(topic1),
consumed).process(punctuateProcessor);
@@ -1079,7 +1057,7 @@ public class StreamThreadTest {
assignedPartitions.add(t1p1);
activeTasks.put(task1, Collections.singleton(t1p1));
- thread.taskManager().setAssignmentMetadata(activeTasks,
Collections.<TaskId, Set<TopicPartition>>emptyMap());
+ thread.taskManager().setAssignmentMetadata(activeTasks,
Collections.emptyMap());
clientSupplier.consumer.assign(assignedPartitions);
clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t1p1,
0L));
@@ -1092,7 +1070,17 @@ public class StreamThreadTest {
mockTime.sleep(100L);
for (long i = 0L; i < 10L; i++) {
- clientSupplier.consumer.addRecord(new ConsumerRecord<>(topic1, 1,
i, i * 100L, TimestampType.CREATE_TIME, ConsumerRecord.NULL_CHECKSUM, ("K" +
i).getBytes().length, ("V" + i).getBytes().length, ("K" + i).getBytes(), ("V" +
i).getBytes()));
+ clientSupplier.consumer.addRecord(new ConsumerRecord<>(
+ topic1,
+ 1,
+ i,
+ i * 100L,
+ TimestampType.CREATE_TIME,
+ ConsumerRecord.NULL_CHECKSUM,
+ ("K" + i).getBytes().length,
+ ("V" + i).getBytes().length,
+ ("K" + i).getBytes(),
+ ("V" + i).getBytes()));
}
thread.runOnce();
@@ -1126,7 +1114,7 @@ public class StreamThreadTest {
@Test
public void
shouldAlwaysReturnEmptyTasksMetadataWhileRebalancingStateAndTasksNotRunning() {
internalStreamsBuilder.stream(Collections.singleton(topic1), consumed)
- .groupByKey().count(Materialized.<Object, Long,
KeyValueStore<Bytes, byte[]>>as("count-one"));
+ .groupByKey().count(Materialized.as("count-one"));
internalStreamsBuilder.buildAndOptimizeTopology();
final StreamThread thread = createStreamThread(clientId, config,
false);
@@ -1176,7 +1164,7 @@ public class StreamThreadTest {
@Test
public void
shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestore() throws
Exception {
internalStreamsBuilder.stream(Collections.singleton("topic"), consumed)
- .groupByKey().count(Materialized.<Object, Long,
KeyValueStore<Bytes, byte[]>>as("count"));
+ .groupByKey().count(Materialized.as("count"));
internalStreamsBuilder.buildAndOptimizeTopology();
final StreamThread thread = createStreamThread("clientId", config,
false);
@@ -1188,7 +1176,7 @@ public class StreamThreadTest {
final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
activeTasks.put(new TaskId(0, 0), topicPartitionSet);
- thread.taskManager().setAssignmentMetadata(activeTasks,
Collections.<TaskId, Set<TopicPartition>>emptyMap());
+ thread.taskManager().setAssignmentMetadata(activeTasks,
Collections.emptyMap());
mockConsumer.updatePartitions(
"topic",
@@ -1222,32 +1210,28 @@ public class StreamThreadTest {
mockRestoreConsumer.updateBeginningOffsets(Collections.singletonMap(changelogPartition,
0L));
mockRestoreConsumer.updateEndOffsets(Collections.singletonMap(changelogPartition,
2L));
- mockConsumer.schedulePollTask(new Runnable() {
- @Override
- public void run() {
- thread.setState(StreamThread.State.PARTITIONS_REVOKED);
-
thread.rebalanceListener.onPartitionsAssigned(topicPartitionSet);
- }
+ mockConsumer.schedulePollTask(() -> {
+ thread.setState(StreamThread.State.PARTITIONS_REVOKED);
+ thread.rebalanceListener.onPartitionsAssigned(topicPartitionSet);
});
try {
thread.start();
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return mockRestoreConsumer.assignment().size() == 1;
- }
- }, "Never restore first record");
+ TestUtils.waitForCondition(
+ () -> mockRestoreConsumer.assignment().size() == 1,
+ "Never restore first record");
- mockRestoreConsumer.addRecord(new
ConsumerRecord<>("stream-thread-test-count-changelog", 0, 0L, "K1".getBytes(),
"V1".getBytes()));
+ mockRestoreConsumer.addRecord(new ConsumerRecord<>(
+ "stream-thread-test-count-changelog",
+ 0,
+ 0L,
+ "K1".getBytes(),
+ "V1".getBytes()));
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return mockRestoreConsumer.position(changelogPartition) ==
1L;
- }
- }, "Never restore first record");
+ TestUtils.waitForCondition(
+ () -> mockRestoreConsumer.position(changelogPartition) == 1L,
+ "Never restore first record");
mockRestoreConsumer.setException(new InvalidOffsetException("Try
Again!") {
@Override
@@ -1256,16 +1240,25 @@ public class StreamThreadTest {
}
});
- mockRestoreConsumer.addRecord(new
ConsumerRecord<>("stream-thread-test-count-changelog", 0, 0L, "K1".getBytes(),
"V1".getBytes()));
- mockRestoreConsumer.addRecord(new
ConsumerRecord<>("stream-thread-test-count-changelog", 0, 1L, "K2".getBytes(),
"V2".getBytes()));
+ mockRestoreConsumer.addRecord(new ConsumerRecord<>(
+ "stream-thread-test-count-changelog",
+ 0,
+ 0L,
+ "K1".getBytes(),
+ "V1".getBytes()));
+ mockRestoreConsumer.addRecord(new ConsumerRecord<>(
+ "stream-thread-test-count-changelog",
+ 0,
+ 1L,
+ "K2".getBytes(),
+ "V2".getBytes()));
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
+ TestUtils.waitForCondition(
+ () -> {
mockRestoreConsumer.assign(changelogPartitionSet);
return mockRestoreConsumer.position(changelogPartition) ==
2L;
- }
- }, "Never finished restore");
+ },
+ "Never finished restore");
} finally {
thread.shutdown();
thread.join(10000);
@@ -1279,7 +1272,9 @@ public class StreamThreadTest {
internalTopologyBuilder.addSource(null, "source1", null, null, null,
topic1);
final Properties config = configProps(false);
-
config.setProperty(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class.getName());
+ config.setProperty(
+
StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
+ LogAndContinueExceptionHandler.class.getName());
config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.Integer().getClass().getName());
final StreamThread thread = createStreamThread(clientId, new
StreamsConfig(config), false);
@@ -1291,7 +1286,7 @@ public class StreamThreadTest {
Collections.singletonMap(
new TaskId(0, t1p1.partition()),
assignedPartitions),
- Collections.<TaskId, Set<TopicPartition>>emptyMap());
+ Collections.emptyMap());
final MockConsumer<byte[], byte[]> mockConsumer =
(MockConsumer<byte[], byte[]>) thread.consumer;
mockConsumer.assign(Collections.singleton(t1p1));
@@ -1299,14 +1294,39 @@ public class StreamThreadTest {
thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
thread.runOnce();
- final MetricName skippedTotalMetric =
metrics.metricName("skipped-records-total", "stream-metrics",
Collections.singletonMap("client-id", thread.getName()));
- final MetricName skippedRateMetric =
metrics.metricName("skipped-records-rate", "stream-metrics",
Collections.singletonMap("client-id", thread.getName()));
+ final MetricName skippedTotalMetric = metrics.metricName(
+ "skipped-records-total",
+ "stream-metrics",
+ Collections.singletonMap("client-id", thread.getName()));
+ final MetricName skippedRateMetric = metrics.metricName(
+ "skipped-records-rate",
+ "stream-metrics",
+ Collections.singletonMap("client-id", thread.getName()));
assertEquals(0.0, metrics.metric(skippedTotalMetric).metricValue());
assertEquals(0.0, metrics.metric(skippedRateMetric).metricValue());
long offset = -1;
- mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(),
t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new
byte[0], "I am not an integer.".getBytes()));
- mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(),
t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new
byte[0], "I am not an integer.".getBytes()));
+ mockConsumer.addRecord(new ConsumerRecord<>(
+ t1p1.topic(),
+ t1p1.partition(),
+ ++offset, -1,
+ TimestampType.CREATE_TIME,
+ ConsumerRecord.NULL_CHECKSUM,
+ -1,
+ -1,
+ new byte[0],
+ "I am not an integer.".getBytes()));
+ mockConsumer.addRecord(new ConsumerRecord<>(
+ t1p1.topic(),
+ t1p1.partition(),
+ ++offset,
+ -1,
+ TimestampType.CREATE_TIME,
+ ConsumerRecord.NULL_CHECKSUM,
+ -1,
+ -1,
+ new byte[0],
+ "I am not an integer.".getBytes()));
thread.runOnce();
assertEquals(2.0, metrics.metric(skippedTotalMetric).metricValue());
assertNotEquals(0.0, metrics.metric(skippedRateMetric).metricValue());
@@ -1324,7 +1344,9 @@ public class StreamThreadTest {
internalTopologyBuilder.addSource(null, "source1", null, null, null,
topic1);
final Properties config = configProps(false);
-
config.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
LogAndSkipOnInvalidTimestamp.class.getName());
+ config.setProperty(
+ StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
+ LogAndSkipOnInvalidTimestamp.class.getName());
final StreamThread thread = createStreamThread(clientId, new
StreamsConfig(config), false);
thread.setState(StreamThread.State.STARTING);
@@ -1335,7 +1357,7 @@ public class StreamThreadTest {
Collections.singletonMap(
new TaskId(0, t1p1.partition()),
assignedPartitions),
- Collections.<TaskId, Set<TopicPartition>>emptyMap());
+ Collections.emptyMap());
final MockConsumer<byte[], byte[]> mockConsumer =
(MockConsumer<byte[], byte[]>) thread.consumer;
mockConsumer.assign(Collections.singleton(t1p1));
@@ -1343,28 +1365,34 @@ public class StreamThreadTest {
thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
thread.runOnce();
- final MetricName skippedTotalMetric =
metrics.metricName("skipped-records-total", "stream-metrics",
Collections.singletonMap("client-id", thread.getName()));
- final MetricName skippedRateMetric =
metrics.metricName("skipped-records-rate", "stream-metrics",
Collections.singletonMap("client-id", thread.getName()));
+ final MetricName skippedTotalMetric = metrics.metricName(
+ "skipped-records-total",
+ "stream-metrics",
+ Collections.singletonMap("client-id", thread.getName()));
+ final MetricName skippedRateMetric = metrics.metricName(
+ "skipped-records-rate",
+ "stream-metrics",
+ Collections.singletonMap("client-id", thread.getName()));
assertEquals(0.0, metrics.metric(skippedTotalMetric).metricValue());
assertEquals(0.0, metrics.metric(skippedRateMetric).metricValue());
long offset = -1;
- mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(),
t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new
byte[0], new byte[0]));
- mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(),
t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new
byte[0], new byte[0]));
+ addRecord(mockConsumer, ++offset);
+ addRecord(mockConsumer, ++offset);
thread.runOnce();
assertEquals(2.0, metrics.metric(skippedTotalMetric).metricValue());
assertNotEquals(0.0, metrics.metric(skippedRateMetric).metricValue());
- mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(),
t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new
byte[0], new byte[0]));
- mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(),
t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new
byte[0], new byte[0]));
- mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(),
t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new
byte[0], new byte[0]));
- mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(),
t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new
byte[0], new byte[0]));
+ addRecord(mockConsumer, ++offset);
+ addRecord(mockConsumer, ++offset);
+ addRecord(mockConsumer, ++offset);
+ addRecord(mockConsumer, ++offset);
thread.runOnce();
assertEquals(6.0, metrics.metric(skippedTotalMetric).metricValue());
assertNotEquals(0.0, metrics.metric(skippedRateMetric).metricValue());
- mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(),
t1p1.partition(), ++offset, 1, TimestampType.CREATE_TIME, -1, -1, -1, new
byte[0], new byte[0]));
- mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(),
t1p1.partition(), ++offset, 1, TimestampType.CREATE_TIME, -1, -1, -1, new
byte[0], new byte[0]));
+ addRecord(mockConsumer, ++offset, 1L);
+ addRecord(mockConsumer, ++offset, 1L);
thread.runOnce();
assertEquals(6.0, metrics.metric(skippedTotalMetric).metricValue());
assertNotEquals(0.0, metrics.metric(skippedRateMetric).metricValue());
@@ -1417,7 +1445,8 @@ public class StreamThreadTest {
final Consumer<byte[], byte[]> consumer =
EasyMock.createNiceMock(Consumer.class);
final TaskManager taskManager = mockTaskManagerCommit(consumer, 1, 0);
- final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new
StreamThread.StreamsMetricsThreadImpl(metrics, "");
+ final StreamThread.StreamsMetricsThreadImpl streamsMetrics
+ = new StreamThread.StreamsMetricsThreadImpl(metrics, "");
final StreamThread thread = new StreamThread(
mockTime,
config,
@@ -1432,18 +1461,13 @@ public class StreamThreadTest {
new LogContext(""),
new AtomicInteger()
);
- final MetricName testMetricName = new MetricName("test_metric", "",
"", new HashMap<String, String>());
+ final MetricName testMetricName = new MetricName("test_metric", "",
"", new HashMap<>());
final Metric testMetric = new KafkaMetric(
- new Object(),
- testMetricName,
- new Measurable() {
- @Override
- public double measure(final MetricConfig config, final
long now) {
- return 0;
- }
- },
- null,
- new MockTime());
+ new Object(),
+ testMetricName,
+ (Measurable) (config, now) -> 0,
+ null,
+ new MockTime());
producer.setMockMetrics(testMetricName, testMetric);
final Map<MetricName, Metric> producerMetrics =
thread.producerMetrics();
assertEquals(testMetricName,
producerMetrics.get(testMetricName).metricName());
@@ -1461,7 +1485,8 @@ public class StreamThreadTest {
final Consumer<byte[], byte[]> consumer =
EasyMock.createNiceMock(Consumer.class);
final TaskManager taskManager =
EasyMock.createNiceMock(TaskManager.class);
- final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new
StreamThread.StreamsMetricsThreadImpl(metrics, "");
+ final StreamThread.StreamsMetricsThreadImpl streamsMetrics
+ = new StreamThread.StreamsMetricsThreadImpl(metrics, "");
final StreamThread thread = new StreamThread(
mockTime,
config,
@@ -1476,18 +1501,13 @@ public class StreamThreadTest {
new LogContext(""),
new AtomicInteger()
);
- final MetricName testMetricName = new MetricName("test_metric", "",
"", new HashMap<String, String>());
+ final MetricName testMetricName = new MetricName("test_metric", "",
"", new HashMap<>());
final Metric testMetric = new KafkaMetric(
- new Object(),
- testMetricName,
- new Measurable() {
- @Override
- public double measure(final MetricConfig config, final
long now) {
- return 0;
- }
- },
- null,
- new MockTime());
+ new Object(),
+ testMetricName,
+ (Measurable) (config, now) -> 0,
+ null,
+ new MockTime());
EasyMock.expect(taskManager.getAdminClient()).andReturn(adminClient);
@@ -1498,4 +1518,25 @@ public class StreamThreadTest {
final Map<MetricName, Metric> adminClientMetrics =
thread.adminClientMetrics();
assertEquals(testMetricName,
adminClientMetrics.get(testMetricName).metricName());
}
+
+ private void addRecord(final MockConsumer<byte[], byte[]> mockConsumer,
+ final long offset) {
+ addRecord(mockConsumer, offset, -1L);
+ }
+
+ private void addRecord(final MockConsumer<byte[], byte[]> mockConsumer,
+ final long offset,
+ final long timestamp) {
+ mockConsumer.addRecord(new ConsumerRecord<>(
+ t1p1.topic(),
+ t1p1.partition(),
+ offset,
+ timestamp,
+ TimestampType.CREATE_TIME,
+ ConsumerRecord.NULL_CHECKSUM,
+ -1,
+ -1,
+ new byte[0],
+ new byte[0]));
+ }
}