This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3991d81 MINOR: code cleanup (#6056)
3991d81 is described below
commit 3991d81f6c645bdf36c58e3d56b829ff92dbff3a
Author: Matthias J. Sax <[email protected]>
AuthorDate: Tue Jan 8 22:32:53 2019 +0100
MINOR: code cleanup (#6056)
Reviewers: Bill Bejeck <[email protected]>, John Roesler
<[email protected]>, Guozhang Wang <[email protected]>
---
.../apache/kafka/streams/StreamsBuilderTest.java | 245 +++++++++++------
.../org/apache/kafka/streams/TopologyTest.java | 36 +--
.../streams/integration/EosIntegrationTest.java | 96 +++----
.../KStreamAggregationIntegrationTest.java | 4 +-
.../kstream/internals/KGroupedStreamImplTest.java | 272 ++++++++-----------
.../kstream/internals/KGroupedTableImplTest.java | 234 +++++++++-------
.../kstream/internals/KStreamKStreamJoinTest.java | 31 ++-
.../kstream/internals/KStreamPrintTest.java | 48 ++--
...KStreamSessionWindowAggregateProcessorTest.java | 49 ++--
.../streams/kstream/internals/KTableImplTest.java | 301 +++++++++++----------
.../kstream/internals/KTableMapValuesTest.java | 35 ++-
.../internals/KTableTransformValuesTest.java | 25 +-
.../internals/TimeWindowedKStreamImplTest.java | 118 ++++----
.../apache/kafka/streams/perf/SimpleBenchmark.java | 4 +-
.../processor/internals/AbstractTaskTest.java | 4 +-
.../internals/ProcessorStateManagerTest.java | 4 +-
.../CompositeReadOnlySessionStoreTest.java | 9 +-
.../state/internals/RocksDBSessionStoreTest.java | 4 +-
.../apache/kafka/streams/TopologyTestDriver.java | 4 +-
19 files changed, 818 insertions(+), 705 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 894b561..9e88a87 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -45,9 +45,9 @@ import java.util.Properties;
import static java.util.Arrays.asList;
import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
public class StreamsBuilderTest {
@@ -64,84 +64,143 @@ public class StreamsBuilderTest {
@Test
public void shouldAllowJoinUnmaterializedFilteredKTable() {
- final KTable<Bytes, String> filteredKTable = builder.<Bytes,
String>table("table-topic").filter(MockPredicate.allGoodPredicate());
- builder.<Bytes, String>stream("stream-topic").join(filteredKTable,
MockValueJoiner.TOSTRING_JOINER);
+ final KTable<Bytes, String> filteredKTable = builder
+ .<Bytes, String>table("table-topic")
+ .filter(MockPredicate.allGoodPredicate());
+ builder
+ .<Bytes, String>stream("stream-topic")
+ .join(filteredKTable, MockValueJoiner.TOSTRING_JOINER);
builder.build();
- final ProcessorTopology topology =
builder.internalTopologyBuilder.rewriteTopology(new
StreamsConfig(props)).build();
-
- assertThat(topology.stateStores().size(), equalTo(1));
-
assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"),
equalTo(Collections.singleton(topology.stateStores().get(0).name())));
-
assertThat(topology.processorConnectedStateStores("KTABLE-FILTER-0000000003").isEmpty(),
is(true));
+ final ProcessorTopology topology =
+ builder.internalTopologyBuilder.rewriteTopology(new
StreamsConfig(props)).build();
+
+ assertThat(
+ topology.stateStores().size(),
+ equalTo(1));
+ assertThat(
+ topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"),
+
equalTo(Collections.singleton(topology.stateStores().get(0).name())));
+ assertTrue(
+
topology.processorConnectedStateStores("KTABLE-FILTER-0000000003").isEmpty());
}
@Test
public void shouldAllowJoinMaterializedFilteredKTable() {
- final KTable<Bytes, String> filteredKTable = builder.<Bytes,
String>table("table-topic")
- .filter(MockPredicate.allGoodPredicate(),
Materialized.as("store"));
- builder.<Bytes, String>stream("stream-topic").join(filteredKTable,
MockValueJoiner.TOSTRING_JOINER);
+ final KTable<Bytes, String> filteredKTable = builder
+ .<Bytes, String>table("table-topic")
+ .filter(MockPredicate.allGoodPredicate(),
Materialized.as("store"));
+ builder
+ .<Bytes, String>stream("stream-topic")
+ .join(filteredKTable, MockValueJoiner.TOSTRING_JOINER);
builder.build();
- final ProcessorTopology topology =
builder.internalTopologyBuilder.rewriteTopology(new
StreamsConfig(props)).build();
-
- assertThat(topology.stateStores().size(), equalTo(1));
-
assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"),
equalTo(Collections.singleton("store")));
-
assertThat(topology.processorConnectedStateStores("KTABLE-FILTER-0000000003"),
equalTo(Collections.singleton("store")));
+ final ProcessorTopology topology =
+ builder.internalTopologyBuilder.rewriteTopology(new
StreamsConfig(props)).build();
+
+ assertThat(
+ topology.stateStores().size(),
+ equalTo(1));
+ assertThat(
+ topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"),
+ equalTo(Collections.singleton("store")));
+ assertThat(
+ topology.processorConnectedStateStores("KTABLE-FILTER-0000000003"),
+ equalTo(Collections.singleton("store")));
}
@Test
public void shouldAllowJoinUnmaterializedMapValuedKTable() {
- final KTable<Bytes, String> mappedKTable = builder.<Bytes,
String>table("table-topic").mapValues(MockMapper.noOpValueMapper());
- builder.<Bytes, String>stream("stream-topic").join(mappedKTable,
MockValueJoiner.TOSTRING_JOINER);
+ final KTable<Bytes, String> mappedKTable = builder
+ .<Bytes, String>table("table-topic")
+ .mapValues(MockMapper.noOpValueMapper());
+ builder
+ .<Bytes, String>stream("stream-topic")
+ .join(mappedKTable, MockValueJoiner.TOSTRING_JOINER);
builder.build();
- final ProcessorTopology topology =
builder.internalTopologyBuilder.rewriteTopology(new
StreamsConfig(props)).build();
-
- assertThat(topology.stateStores().size(), equalTo(1));
-
assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"),
equalTo(Collections.singleton(topology.stateStores().get(0).name())));
-
assertThat(topology.processorConnectedStateStores("KTABLE-MAPVALUES-0000000003").isEmpty(),
is(true));
+ final ProcessorTopology topology =
+ builder.internalTopologyBuilder.rewriteTopology(new
StreamsConfig(props)).build();
+
+ assertThat(
+ topology.stateStores().size(),
+ equalTo(1));
+ assertThat(
+ topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"),
+
equalTo(Collections.singleton(topology.stateStores().get(0).name())));
+ assertTrue(
+
topology.processorConnectedStateStores("KTABLE-MAPVALUES-0000000003").isEmpty());
}
@Test
public void shouldAllowJoinMaterializedMapValuedKTable() {
- final KTable<Bytes, String> mappedKTable = builder.<Bytes,
String>table("table-topic")
- .mapValues(MockMapper.noOpValueMapper(),
Materialized.as("store"));
- builder.<Bytes, String>stream("stream-topic").join(mappedKTable,
MockValueJoiner.TOSTRING_JOINER);
+ final KTable<Bytes, String> mappedKTable = builder
+ .<Bytes, String>table("table-topic")
+ .mapValues(MockMapper.noOpValueMapper(), Materialized.as("store"));
+ builder
+ .<Bytes, String>stream("stream-topic")
+ .join(mappedKTable, MockValueJoiner.TOSTRING_JOINER);
builder.build();
- final ProcessorTopology topology =
builder.internalTopologyBuilder.rewriteTopology(new
StreamsConfig(props)).build();
-
- assertThat(topology.stateStores().size(), equalTo(1));
-
assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"),
equalTo(Collections.singleton("store")));
-
assertThat(topology.processorConnectedStateStores("KTABLE-MAPVALUES-0000000003"),
equalTo(Collections.singleton("store")));
+ final ProcessorTopology topology =
+ builder.internalTopologyBuilder.rewriteTopology(new
StreamsConfig(props)).build();
+
+ assertThat(
+ topology.stateStores().size(),
+ equalTo(1));
+ assertThat(
+ topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"),
+ equalTo(Collections.singleton("store")));
+ assertThat(
+
topology.processorConnectedStateStores("KTABLE-MAPVALUES-0000000003"),
+ equalTo(Collections.singleton("store")));
}
@Test
public void shouldAllowJoinUnmaterializedJoinedKTable() {
final KTable<Bytes, String> table1 = builder.table("table-topic1");
final KTable<Bytes, String> table2 = builder.table("table-topic2");
- builder.<Bytes, String>stream("stream-topic").join(table1.join(table2,
MockValueJoiner.TOSTRING_JOINER), MockValueJoiner.TOSTRING_JOINER);
+ builder
+ .<Bytes, String>stream("stream-topic")
+ .join(table1.join(table2, MockValueJoiner.TOSTRING_JOINER),
MockValueJoiner.TOSTRING_JOINER);
builder.build();
- final ProcessorTopology topology =
builder.internalTopologyBuilder.rewriteTopology(new
StreamsConfig(props)).build();
-
- assertThat(topology.stateStores().size(), equalTo(2));
-
assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000010"),
equalTo(Utils.mkSet(topology.stateStores().get(0).name(),
topology.stateStores().get(1).name())));
-
assertThat(topology.processorConnectedStateStores("KTABLE-MERGE-0000000007").isEmpty(),
is(true));
+ final ProcessorTopology topology =
+ builder.internalTopologyBuilder.rewriteTopology(new
StreamsConfig(props)).build();
+
+ assertThat(
+ topology.stateStores().size(),
+ equalTo(2));
+ assertThat(
+ topology.processorConnectedStateStores("KSTREAM-JOIN-0000000010"),
+ equalTo(Utils.mkSet(topology.stateStores().get(0).name(),
topology.stateStores().get(1).name())));
+ assertTrue(
+
topology.processorConnectedStateStores("KTABLE-MERGE-0000000007").isEmpty());
}
@Test
public void shouldAllowJoinMaterializedJoinedKTable() {
final KTable<Bytes, String> table1 = builder.table("table-topic1");
final KTable<Bytes, String> table2 = builder.table("table-topic2");
- builder.<Bytes, String>stream("stream-topic").join(table1.join(table2,
MockValueJoiner.TOSTRING_JOINER, Materialized.as("store")),
MockValueJoiner.TOSTRING_JOINER);
+ builder
+ .<Bytes, String>stream("stream-topic")
+ .join(
+ table1.join(table2, MockValueJoiner.TOSTRING_JOINER,
Materialized.as("store")),
+ MockValueJoiner.TOSTRING_JOINER);
builder.build();
- final ProcessorTopology topology =
builder.internalTopologyBuilder.rewriteTopology(new
StreamsConfig(props)).build();
-
- assertThat(topology.stateStores().size(), equalTo(3));
-
assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000010"),
equalTo(Collections.singleton("store")));
-
assertThat(topology.processorConnectedStateStores("KTABLE-MERGE-0000000007"),
equalTo(Collections.singleton("store")));
+ final ProcessorTopology topology =
+ builder.internalTopologyBuilder.rewriteTopology(new
StreamsConfig(props)).build();
+
+ assertThat(
+ topology.stateStores().size(),
+ equalTo(3));
+ assertThat(
+ topology.processorConnectedStateStores("KSTREAM-JOIN-0000000010"),
+ equalTo(Collections.singleton("store")));
+ assertThat(
+ topology.processorConnectedStateStores("KTABLE-MERGE-0000000007"),
+ equalTo(Collections.singleton("store")));
}
@Test
@@ -150,11 +209,18 @@ public class StreamsBuilderTest {
builder.<Bytes, String>stream("stream-topic").join(table,
MockValueJoiner.TOSTRING_JOINER);
builder.build();
- final ProcessorTopology topology =
builder.internalTopologyBuilder.rewriteTopology(new
StreamsConfig(props)).build();
-
- assertThat(topology.stateStores().size(), equalTo(1));
-
assertThat(topology.processorConnectedStateStores("KTABLE-SOURCE-0000000002"),
equalTo(Collections.singleton(topology.stateStores().get(0).name())));
-
assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000004"),
equalTo(Collections.singleton(topology.stateStores().get(0).name())));
+ final ProcessorTopology topology =
+ builder.internalTopologyBuilder.rewriteTopology(new
StreamsConfig(props)).build();
+
+ assertThat(
+ topology.stateStores().size(),
+ equalTo(1));
+ assertThat(
+ topology.processorConnectedStateStores("KTABLE-SOURCE-0000000002"),
+
equalTo(Collections.singleton(topology.stateStores().get(0).name())));
+ assertThat(
+ topology.processorConnectedStateStores("KSTREAM-JOIN-0000000004"),
+
equalTo(Collections.singleton(topology.stateStores().get(0).name())));
}
@Test
@@ -163,16 +229,17 @@ public class StreamsBuilderTest {
source.to("topic-sink");
final MockProcessorSupplier<String, String> processorSupplier = new
MockProcessorSupplier<>();
-
source.process(processorSupplier);
+ final ConsumerRecordFactory<String, String> recordFactory =
+ new ConsumerRecordFactory<>(new StringSerializer(), new
StringSerializer());
+
try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), props)) {
- final ConsumerRecordFactory<String, String> recordFactory = new
ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
driver.pipeInput(recordFactory.create("topic-source", "A", "aa"));
}
// no exception was thrown
- assertEquals(asList("A:aa"),
processorSupplier.theCapturedProcessor().processed);
+ assertEquals(Collections.singletonList("A:aa"),
processorSupplier.theCapturedProcessor().processed);
}
@Test
@@ -181,18 +248,20 @@ public class StreamsBuilderTest {
final KStream<String, String> through = source.through("topic-sink");
final MockProcessorSupplier<String, String> sourceProcessorSupplier =
new MockProcessorSupplier<>();
- final MockProcessorSupplier<String, String> throughProcessorSupplier =
new MockProcessorSupplier<>();
-
source.process(sourceProcessorSupplier);
+
+ final MockProcessorSupplier<String, String> throughProcessorSupplier =
new MockProcessorSupplier<>();
through.process(throughProcessorSupplier);
+ final ConsumerRecordFactory<String, String> recordFactory =
+ new ConsumerRecordFactory<>(new StringSerializer(), new
StringSerializer());
+
try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), props)) {
- final ConsumerRecordFactory<String, String> recordFactory = new
ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
driver.pipeInput(recordFactory.create("topic-source", "A", "aa"));
}
- assertEquals(asList("A:aa"),
sourceProcessorSupplier.theCapturedProcessor().processed);
- assertEquals(asList("A:aa"),
throughProcessorSupplier.theCapturedProcessor().processed);
+ assertEquals(Collections.singletonList("A:aa"),
sourceProcessorSupplier.theCapturedProcessor().processed);
+ assertEquals(Collections.singletonList("A:aa"),
throughProcessorSupplier.theCapturedProcessor().processed);
}
@Test
@@ -207,7 +276,9 @@ public class StreamsBuilderTest {
final MockProcessorSupplier<String, String> processorSupplier = new
MockProcessorSupplier<>();
merged.process(processorSupplier);
- final ConsumerRecordFactory<String, String> recordFactory = new
ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+ final ConsumerRecordFactory<String, String> recordFactory =
+ new ConsumerRecordFactory<>(new StringSerializer(), new
StringSerializer());
+
try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), props)) {
driver.pipeInput(recordFactory.create(topic1, "A", "aa"));
driver.pipeInput(recordFactory.create(topic2, "B", "bb"));
@@ -222,18 +293,15 @@ public class StreamsBuilderTest {
public void shouldUseSerdesDefinedInMaterializedToConsumeTable() {
final Map<Long, String> results = new HashMap<>();
final String topic = "topic";
- final ForeachAction<Long, String> action = new ForeachAction<Long,
String>() {
- @Override
- public void apply(final Long key, final String value) {
- results.put(key, value);
- }
- };
+ final ForeachAction<Long, String> action = results::put;
builder.table(topic, Materialized.<Long, String, KeyValueStore<Bytes,
byte[]>>as("store")
.withKeySerde(Serdes.Long())
.withValueSerde(Serdes.String()))
.toStream().foreach(action);
- final ConsumerRecordFactory<Long, String> recordFactory = new
ConsumerRecordFactory<>(new LongSerializer(), new StringSerializer());
+ final ConsumerRecordFactory<Long, String> recordFactory =
+ new ConsumerRecordFactory<>(new LongSerializer(), new
StringSerializer());
+
try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), props)) {
driver.pipeInput(recordFactory.create(topic, 1L, "value1"));
driver.pipeInput(recordFactory.create(topic, 2L, "value2"));
@@ -253,7 +321,9 @@ public class StreamsBuilderTest {
.withKeySerde(Serdes.Long())
.withValueSerde(Serdes.String()));
- final ConsumerRecordFactory<Long, String> recordFactory = new
ConsumerRecordFactory<>(new LongSerializer(), new StringSerializer());
+ final ConsumerRecordFactory<Long, String> recordFactory =
+ new ConsumerRecordFactory<>(new LongSerializer(), new
StringSerializer());
+
try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), props)) {
driver.pipeInput(recordFactory.create(topic, 1L, "value1"));
driver.pipeInput(recordFactory.create(topic, 2L, "value2"));
@@ -269,7 +339,8 @@ public class StreamsBuilderTest {
final String topic = "topic";
builder.table(topic, Materialized.with(Serdes.Long(),
Serdes.String()));
- final ProcessorTopology topology =
builder.internalTopologyBuilder.rewriteTopology(new
StreamsConfig(props)).build();
+ final ProcessorTopology topology =
+ builder.internalTopologyBuilder.rewriteTopology(new
StreamsConfig(props)).build();
assertThat(topology.stateStores().size(), equalTo(0));
}
@@ -285,13 +356,18 @@ public class StreamsBuilderTest {
final InternalTopologyBuilder internalTopologyBuilder =
TopologyWrapper.getInternalTopologyBuilder(topology);
internalTopologyBuilder.rewriteTopology(new StreamsConfig(props));
- assertThat(internalTopologyBuilder.build().storeToChangelogTopic(),
equalTo(Collections.singletonMap("store", "topic")));
-
- assertThat(internalTopologyBuilder.getStateStores().keySet(),
equalTo(Collections.singleton("store")));
-
-
assertThat(internalTopologyBuilder.getStateStores().get("store").loggingEnabled(),
equalTo(false));
-
-
assertThat(internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.isEmpty(),
equalTo(true));
+ assertThat(
+ internalTopologyBuilder.build().storeToChangelogTopic(),
+ equalTo(Collections.singletonMap("store", "topic")));
+ assertThat(
+ internalTopologyBuilder.getStateStores().keySet(),
+ equalTo(Collections.singleton("store")));
+ assertThat(
+
internalTopologyBuilder.getStateStores().get("store").loggingEnabled(),
+ equalTo(false));
+ assertThat(
+
internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.isEmpty(),
+ equalTo(true));
}
@Test
@@ -302,24 +378,29 @@ public class StreamsBuilderTest {
final InternalTopologyBuilder internalTopologyBuilder =
TopologyWrapper.getInternalTopologyBuilder(builder.build());
internalTopologyBuilder.setApplicationId("appId");
- assertThat(internalTopologyBuilder.build().storeToChangelogTopic(),
equalTo(Collections.singletonMap("store", "appId-store-changelog")));
-
- assertThat(internalTopologyBuilder.getStateStores().keySet(),
equalTo(Collections.singleton("store")));
-
-
assertThat(internalTopologyBuilder.getStateStores().get("store").loggingEnabled(),
equalTo(true));
-
-
assertThat(internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.keySet(),
equalTo(Collections.singleton("appId-store-changelog")));
+ assertThat(
+ internalTopologyBuilder.build().storeToChangelogTopic(),
+ equalTo(Collections.singletonMap("store",
"appId-store-changelog")));
+ assertThat(
+ internalTopologyBuilder.getStateStores().keySet(),
+ equalTo(Collections.singleton("store")));
+ assertThat(
+
internalTopologyBuilder.getStateStores().get("store").loggingEnabled(),
+ equalTo(true));
+ assertThat(
+
internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.keySet(),
+ equalTo(Collections.singleton("appId-store-changelog")));
}
@Test(expected = TopologyException.class)
public void shouldThrowExceptionWhenNoTopicPresent() {
- builder.stream(Collections.<String>emptyList());
+ builder.stream(Collections.emptyList());
builder.build();
}
@Test(expected = NullPointerException.class)
public void shouldThrowExceptionWhenTopicNamesAreNull() {
- builder.stream(Arrays.<String>asList(null, null));
+ builder.stream(Arrays.asList(null, null));
builder.build();
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 046ffb0..7ea9b2b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -374,8 +374,8 @@ public class TopologyTest {
@Test
public void sinkShouldReturnNullTopicWithDynamicRouting() {
- final TopologyDescription.Sink expectedSinkNode
- = new InternalTopologyBuilder.Sink("sink", (key, value,
record) -> record.topic() + "-" + key);
+ final TopologyDescription.Sink expectedSinkNode =
+ new InternalTopologyBuilder.Sink("sink", (key, value, record) ->
record.topic() + "-" + key);
assertThat(expectedSinkNode.topic(), equalTo(null));
}
@@ -383,8 +383,8 @@ public class TopologyTest {
@Test
public void sinkShouldReturnTopicNameExtractorWithDynamicRouting() {
final TopicNameExtractor topicNameExtractor = (key, value, record) ->
record.topic() + "-" + key;
- final TopologyDescription.Sink expectedSinkNode
- = new InternalTopologyBuilder.Sink("sink", topicNameExtractor);
+ final TopologyDescription.Sink expectedSinkNode =
+ new InternalTopologyBuilder.Sink("sink", topicNameExtractor);
assertThat(expectedSinkNode.topicNameExtractor(),
equalTo(topicNameExtractor));
}
@@ -459,8 +459,8 @@ public class TopologyTest {
public void sourceAndProcessorWithStateShouldHaveSingleSubtopology() {
final TopologyDescription.Source expectedSourceNode =
addSource("source", "topic");
final String[] store = new String[] {"store"};
- final TopologyDescription.Processor expectedProcessorNode
- = addProcessorWithNewStore("processor", store, expectedSourceNode);
+ final TopologyDescription.Processor expectedProcessorNode =
+ addProcessorWithNewStore("processor", store, expectedSourceNode);
final Set<TopologyDescription.Node> allNodes = new HashSet<>();
allNodes.add(expectedSourceNode);
@@ -475,8 +475,8 @@ public class TopologyTest {
public void
sourceAndProcessorWithMultipleStatesShouldHaveSingleSubtopology() {
final TopologyDescription.Source expectedSourceNode =
addSource("source", "topic");
final String[] stores = new String[] {"store1", "store2"};
- final TopologyDescription.Processor expectedProcessorNode
- = addProcessorWithNewStore("processor", stores,
expectedSourceNode);
+ final TopologyDescription.Processor expectedProcessorNode =
+ addProcessorWithNewStore("processor", stores, expectedSourceNode);
final Set<TopologyDescription.Node> allNodes = new HashSet<>();
allNodes.add(expectedSourceNode);
@@ -612,16 +612,16 @@ public class TopologyTest {
final String[] bothStores = new String[] {store1[0], store2[0]};
final TopologyDescription.Source expectedSourceNode1 =
addSource("source", "topic");
- final TopologyDescription.Processor expectedProcessorNode1
- = addProcessorWithNewStore("processor1", store1,
expectedSourceNode1);
+ final TopologyDescription.Processor expectedProcessorNode1 =
+ addProcessorWithNewStore("processor1", store1,
expectedSourceNode1);
final TopologyDescription.Source expectedSourceNode2 =
addSource("source2", "topic2");
- final TopologyDescription.Processor expectedProcessorNode2
- = addProcessorWithNewStore("processor2", store2,
expectedSourceNode2);
+ final TopologyDescription.Processor expectedProcessorNode2 =
+ addProcessorWithNewStore("processor2", store2,
expectedSourceNode2);
final TopologyDescription.Source expectedSourceNode3 =
addSource("source3", "topic3");
- final TopologyDescription.Processor expectedProcessorNode3
- = addProcessorWithExistingStore("processor3", bothStores,
expectedSourceNode3);
+ final TopologyDescription.Processor expectedProcessorNode3 =
+ addProcessorWithExistingStore("processor3", bothStores,
expectedSourceNode3);
final Set<TopologyDescription.Node> allNodes = new HashSet<>();
allNodes.add(expectedSourceNode1);
@@ -1138,8 +1138,8 @@ public class TopologyTest {
} else {
topology.connectProcessorAndStateStores(processorName, storeNames);
}
- final TopologyDescription.Processor expectedProcessorNode
- = new InternalTopologyBuilder.Processor(processorName, new
HashSet<>(Arrays.asList(storeNames)));
+ final TopologyDescription.Processor expectedProcessorNode =
+ new InternalTopologyBuilder.Processor(processorName, new
HashSet<>(Arrays.asList(storeNames)));
for (final TopologyDescription.Node parent : parents) {
((InternalTopologyBuilder.AbstractNode)
parent).addSuccessor(expectedProcessorNode);
@@ -1158,8 +1158,8 @@ public class TopologyTest {
}
topology.addSink(sinkName, sinkTopic, null, null, null, parentNames);
- final TopologyDescription.Sink expectedSinkNode
- = new InternalTopologyBuilder.Sink(sinkName, sinkTopic);
+ final TopologyDescription.Sink expectedSinkNode =
+ new InternalTopologyBuilder.Sink(sinkName, sinkTopic);
for (final TopologyDescription.Node parent : parents) {
((InternalTopologyBuilder.AbstractNode)
parent).addSuccessor(expectedSinkNode);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index b2bd0a8..bdfbb3b 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -185,21 +185,21 @@ public class EosIntegrationTest {
CLUSTER.time
);
- final List<KeyValue<Long, Long>> committedRecords
- = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
- TestUtils.consumerConfig(
- CLUSTER.bootstrapServers(),
- CONSUMER_GROUP_ID,
- LongDeserializer.class,
- LongDeserializer.class,
- new Properties() {
- {
- put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
- }
- }),
- outputTopic,
- inputData.size()
- );
+ final List<KeyValue<Long, Long>> committedRecords =
+ IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+ TestUtils.consumerConfig(
+ CLUSTER.bootstrapServers(),
+ CONSUMER_GROUP_ID,
+ LongDeserializer.class,
+ LongDeserializer.class,
+ new Properties() {
+ {
+ put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
+ }
+ }),
+ outputTopic,
+ inputData.size()
+ );
checkResultPerKey(committedRecords, inputData);
} finally {
@@ -273,21 +273,21 @@ public class EosIntegrationTest {
CLUSTER.time
);
- final List<KeyValue<Long, Long>> firstCommittedRecords
- = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
- TestUtils.consumerConfig(
- CLUSTER.bootstrapServers(),
- CONSUMER_GROUP_ID,
- LongDeserializer.class,
- LongDeserializer.class,
- new Properties() {
- {
- put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
- }
- }),
- SINGLE_PARTITION_OUTPUT_TOPIC,
- firstBurstOfData.size()
- );
+ final List<KeyValue<Long, Long>> firstCommittedRecords =
+ IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+ TestUtils.consumerConfig(
+ CLUSTER.bootstrapServers(),
+ CONSUMER_GROUP_ID,
+ LongDeserializer.class,
+ LongDeserializer.class,
+ new Properties() {
+ {
+ put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
+ }
+ }),
+ SINGLE_PARTITION_OUTPUT_TOPIC,
+ firstBurstOfData.size()
+ );
assertThat(firstCommittedRecords, equalTo(firstBurstOfData));
@@ -298,21 +298,21 @@ public class EosIntegrationTest {
CLUSTER.time
);
- final List<KeyValue<Long, Long>> secondCommittedRecords
- = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
- TestUtils.consumerConfig(
- CLUSTER.bootstrapServers(),
- CONSUMER_GROUP_ID,
- LongDeserializer.class,
- LongDeserializer.class,
- new Properties() {
- {
- put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
- }
- }),
- SINGLE_PARTITION_OUTPUT_TOPIC,
- secondBurstOfData.size()
- );
+ final List<KeyValue<Long, Long>> secondCommittedRecords =
+ IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+ TestUtils.consumerConfig(
+ CLUSTER.bootstrapServers(),
+ CONSUMER_GROUP_ID,
+ LongDeserializer.class,
+ LongDeserializer.class,
+ new Properties() {
+ {
+ put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
+ }
+ }),
+ SINGLE_PARTITION_OUTPUT_TOPIC,
+ secondBurstOfData.size()
+ );
assertThat(secondCommittedRecords, equalTo(secondBurstOfData));
} finally {
@@ -593,9 +593,9 @@ public class EosIntegrationTest {
String[] storeNames = null;
if (withState) {
storeNames = new String[] {storeName};
- final StoreBuilder<KeyValueStore<Long, Long>> storeBuilder
- =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName),
Serdes.Long(), Serdes.Long())
- .withCachingEnabled();
+ final StoreBuilder<KeyValueStore<Long, Long>> storeBuilder = Stores
+
.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), Serdes.Long(),
Serdes.Long())
+ .withCachingEnabled();
builder.addStateStore(storeBuilder);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 04cc0e1..3ba8f09 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -628,8 +628,8 @@ public class KStreamAggregationIntegrationTest {
startStreams();
latch.await(30, TimeUnit.SECONDS);
- final ReadOnlySessionStore<String, String> sessionStore
- = kafkaStreams.store(userSessionsStore,
QueryableStoreTypes.sessionStore());
+ final ReadOnlySessionStore<String, String> sessionStore =
+ kafkaStreams.store(userSessionsStore,
QueryableStoreTypes.sessionStore());
// verify correct data received
assertThat(results.get(new Windowed<>("bob", new SessionWindow(t1,
t1))), equalTo("start"));
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index f2fc4f8..9bdea13 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -22,20 +22,15 @@ import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
-import org.apache.kafka.streams.kstream.Aggregator;
-import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
-import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.Merger;
-import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
@@ -43,7 +38,6 @@ import org.apache.kafka.streams.kstream.Windows;
import
org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionStore;
-import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
@@ -76,14 +70,14 @@ public class KGroupedStreamImplTest {
private final StreamsBuilder builder = new StreamsBuilder();
private KGroupedStream<String, String> groupedStream;
- private final ConsumerRecordFactory<String, String> recordFactory = new
ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+ private final ConsumerRecordFactory<String, String> recordFactory =
+ new ConsumerRecordFactory<>(new StringSerializer(), new
StringSerializer());
private final Properties props =
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
@Before
public void before() {
final KStream<String, String> stream = builder.stream(TOPIC,
Consumed.with(Serdes.String(), Serdes.String()));
groupedStream = stream.groupByKey(Grouped.with(Serdes.String(),
Serdes.String()));
-
}
@Test(expected = NullPointerException.class)
@@ -93,12 +87,14 @@ public class KGroupedStreamImplTest {
@Test(expected = InvalidTopicException.class)
public void shouldNotHaveInvalidStoreNameOnReduce() {
- groupedStream.reduce(MockReducer.STRING_ADDER, Materialized.<String,
String, KeyValueStore<Bytes, byte[]>>as(INVALID_STORE_NAME));
+ groupedStream.reduce(MockReducer.STRING_ADDER,
Materialized.as(INVALID_STORE_NAME));
}
@Test(expected = NullPointerException.class)
public void shouldNotHaveNullReducerWithWindowedReduce() {
- groupedStream.windowedBy(TimeWindows.of(ofMillis(10))).reduce(null,
Materialized.<String, String, WindowStore<Bytes, byte[]>>as("store"));
+ groupedStream
+ .windowedBy(TimeWindows.of(ofMillis(10)))
+ .reduce(null, Materialized.as("store"));
}
@Test(expected = NullPointerException.class)
@@ -108,32 +104,41 @@ public class KGroupedStreamImplTest {
@Test(expected = InvalidTopicException.class)
public void shouldNotHaveInvalidStoreNameWithWindowedReduce() {
-
groupedStream.windowedBy(TimeWindows.of(ofMillis(10))).reduce(MockReducer.STRING_ADDER,
Materialized.<String, String, WindowStore<Bytes,
byte[]>>as(INVALID_STORE_NAME));
+ groupedStream
+ .windowedBy(TimeWindows.of(ofMillis(10)))
+ .reduce(MockReducer.STRING_ADDER,
Materialized.as(INVALID_STORE_NAME));
}
@Test(expected = NullPointerException.class)
public void shouldNotHaveNullInitializerOnAggregate() {
- groupedStream.aggregate(null, MockAggregator.TOSTRING_ADDER,
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store"));
+ groupedStream.aggregate(null, MockAggregator.TOSTRING_ADDER,
Materialized.as("store"));
}
@Test(expected = NullPointerException.class)
public void shouldNotHaveNullAdderOnAggregate() {
- groupedStream.aggregate(MockInitializer.STRING_INIT, null,
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store"));
+ groupedStream.aggregate(MockInitializer.STRING_INIT, null,
Materialized.as("store"));
}
@Test(expected = InvalidTopicException.class)
public void shouldNotHaveInvalidStoreNameOnAggregate() {
- groupedStream.aggregate(MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER, Materialized.<String, String,
KeyValueStore<Bytes, byte[]>>as(INVALID_STORE_NAME));
+ groupedStream.aggregate(
+ MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ Materialized.as(INVALID_STORE_NAME));
}
@Test(expected = NullPointerException.class)
public void shouldNotHaveNullInitializerOnWindowedAggregate() {
- groupedStream.windowedBy(TimeWindows.of(ofMillis(10))).aggregate(null,
MockAggregator.TOSTRING_ADDER, Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("store"));
+ groupedStream
+ .windowedBy(TimeWindows.of(ofMillis(10)))
+ .aggregate(null, MockAggregator.TOSTRING_ADDER,
Materialized.as("store"));
}
@Test(expected = NullPointerException.class)
public void shouldNotHaveNullAdderOnWindowedAggregate() {
-
groupedStream.windowedBy(TimeWindows.of(ofMillis(10))).aggregate(MockInitializer.STRING_INIT,
null, Materialized.<String, String, WindowStore<Bytes, byte[]>>as("store"));
+ groupedStream
+ .windowedBy(TimeWindows.of(ofMillis(10)))
+ .aggregate(MockInitializer.STRING_INIT, null,
Materialized.as("store"));
}
@Test(expected = NullPointerException.class)
@@ -143,7 +148,9 @@ public class KGroupedStreamImplTest {
@Test(expected = InvalidTopicException.class)
public void shouldNotHaveInvalidStoreNameOnWindowedAggregate() {
-
groupedStream.windowedBy(TimeWindows.of(ofMillis(10))).aggregate(MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER, Materialized.<String, String,
WindowStore<Bytes, byte[]>>as(INVALID_STORE_NAME));
+ groupedStream
+ .windowedBy(TimeWindows.of(ofMillis(10)))
+ .aggregate(MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER, Materialized.as(INVALID_STORE_NAME));
}
private void doAggregateSessionWindows(final Map<Windowed<String>,
Integer> results) {
@@ -163,28 +170,16 @@ public class KGroupedStreamImplTest {
@Test
public void shouldAggregateSessionWindows() {
final Map<Windowed<String>, Integer> results = new HashMap<>();
- final KTable<Windowed<String>, Integer> table =
groupedStream.windowedBy(SessionWindows.with(ofMillis(30))).aggregate(new
Initializer<Integer>() {
- @Override
- public Integer apply() {
- return 0;
- }
- }, new Aggregator<String, String, Integer>() {
- @Override
- public Integer apply(final String aggKey, final String value,
final Integer aggregate) {
- return aggregate + 1;
- }
- }, new Merger<String, Integer>() {
- @Override
- public Integer apply(final String aggKey, final Integer aggOne,
final Integer aggTwo) {
- return aggOne + aggTwo;
- }
- }, Materialized.<String, Integer, SessionStore<Bytes,
byte[]>>as("session-store").withValueSerde(Serdes.Integer()));
- table.toStream().foreach(new ForeachAction<Windowed<String>,
Integer>() {
- @Override
- public void apply(final Windowed<String> key, final Integer value)
{
- results.put(key, value);
- }
- });
+ final KTable<Windowed<String>, Integer> table = groupedStream
+ .windowedBy(SessionWindows.with(ofMillis(30)))
+ .aggregate(
+ () -> 0,
+ (aggKey, value, aggregate) -> aggregate + 1,
+ (aggKey, aggOne, aggTwo) -> aggOne + aggTwo,
+ Materialized
+ .<String, Integer, SessionStore<Bytes,
byte[]>>as("session-store").
+ withValueSerde(Serdes.Integer()));
+ table.toStream().foreach(results::put);
doAggregateSessionWindows(results);
assertEquals(table.queryableStoreName(), "session-store");
@@ -193,28 +188,14 @@ public class KGroupedStreamImplTest {
@Test
public void shouldAggregateSessionWindowsWithInternalStoreName() {
final Map<Windowed<String>, Integer> results = new HashMap<>();
- final KTable<Windowed<String>, Integer> table =
groupedStream.windowedBy(SessionWindows.with(ofMillis(30))).aggregate(new
Initializer<Integer>() {
- @Override
- public Integer apply() {
- return 0;
- }
- }, new Aggregator<String, String, Integer>() {
- @Override
- public Integer apply(final String aggKey, final String value,
final Integer aggregate) {
- return aggregate + 1;
- }
- }, new Merger<String, Integer>() {
- @Override
- public Integer apply(final String aggKey, final Integer aggOne,
final Integer aggTwo) {
- return aggOne + aggTwo;
- }
- }, Materialized.<String, Integer, SessionStore<Bytes,
byte[]>>with(null, Serdes.Integer()));
- table.toStream().foreach(new ForeachAction<Windowed<String>,
Integer>() {
- @Override
- public void apply(final Windowed<String> key, final Integer value)
{
- results.put(key, value);
- }
- });
+ final KTable<Windowed<String>, Integer> table = groupedStream
+ .windowedBy(SessionWindows.with(ofMillis(30)))
+ .aggregate(
+ () -> 0,
+ (aggKey, value, aggregate) -> aggregate + 1,
+ (aggKey, aggOne, aggTwo) -> aggOne + aggTwo,
+ Materialized.with(null, Serdes.Integer()));
+ table.toStream().foreach(results::put);
doAggregateSessionWindows(results);
}
@@ -236,14 +217,10 @@ public class KGroupedStreamImplTest {
@Test
public void shouldCountSessionWindows() {
final Map<Windowed<String>, Long> results = new HashMap<>();
- final KTable<Windowed<String>, Long> table =
groupedStream.windowedBy(SessionWindows.with(ofMillis(30)))
- .count(Materialized.<String, Long, SessionStore<Bytes,
byte[]>>as("session-store"));
- table.toStream().foreach(new ForeachAction<Windowed<String>, Long>() {
- @Override
- public void apply(final Windowed<String> key, final Long value) {
- results.put(key, value);
- }
- });
+ final KTable<Windowed<String>, Long> table = groupedStream
+ .windowedBy(SessionWindows.with(ofMillis(30)))
+ .count(Materialized.as("session-store"));
+ table.toStream().foreach(results::put);
doCountSessionWindows(results);
assertEquals(table.queryableStoreName(), "session-store");
}
@@ -251,13 +228,10 @@ public class KGroupedStreamImplTest {
@Test
public void shouldCountSessionWindowsWithInternalStoreName() {
final Map<Windowed<String>, Long> results = new HashMap<>();
- final KTable<Windowed<String>, Long> table =
groupedStream.windowedBy(SessionWindows.with(ofMillis(30))).count();
- table.toStream().foreach(new ForeachAction<Windowed<String>, Long>() {
- @Override
- public void apply(final Windowed<String> key, final Long value) {
- results.put(key, value);
- }
- });
+ final KTable<Windowed<String>, Long> table = groupedStream
+ .windowedBy(SessionWindows.with(ofMillis(30)))
+ .count();
+ table.toStream().foreach(results::put);
doCountSessionWindows(results);
assertNull(table.queryableStoreName());
}
@@ -279,19 +253,10 @@ public class KGroupedStreamImplTest {
@Test
public void shouldReduceSessionWindows() {
final Map<Windowed<String>, String> results = new HashMap<>();
- final KTable<Windowed<String>, String> table =
groupedStream.windowedBy(SessionWindows.with(ofMillis(30)))
- .reduce(new Reducer<String>() {
- @Override
- public String apply(final String value1, final String
value2) {
- return value1 + ":" + value2;
- }
- }, Materialized.<String, String, SessionStore<Bytes,
byte[]>>as("session-store"));
- table.toStream().foreach(new ForeachAction<Windowed<String>, String>()
{
- @Override
- public void apply(final Windowed<String> key, final String value) {
- results.put(key, value);
- }
- });
+ final KTable<Windowed<String>, String> table = groupedStream
+ .windowedBy(SessionWindows.with(ofMillis(30)))
+ .reduce((value1, value2) -> value1 + ":" + value2,
Materialized.as("session-store"));
+ table.toStream().foreach(results::put);
doReduceSessionWindows(results);
assertEquals(table.queryableStoreName(), "session-store");
}
@@ -299,26 +264,19 @@ public class KGroupedStreamImplTest {
@Test
public void shouldReduceSessionWindowsWithInternalStoreName() {
final Map<Windowed<String>, String> results = new HashMap<>();
- final KTable<Windowed<String>, String> table =
groupedStream.windowedBy(SessionWindows.with(ofMillis(30)))
- .reduce(new Reducer<String>() {
- @Override
- public String apply(final String value1, final String
value2) {
- return value1 + ":" + value2;
- }
- });
- table.toStream().foreach(new ForeachAction<Windowed<String>, String>()
{
- @Override
- public void apply(final Windowed<String> key, final String value) {
- results.put(key, value);
- }
- });
+ final KTable<Windowed<String>, String> table = groupedStream
+ .windowedBy(SessionWindows.with(ofMillis(30)))
+ .reduce((value1, value2) -> value1 + ":" + value2);
+ table.toStream().foreach(results::put);
doReduceSessionWindows(results);
assertNull(table.queryableStoreName());
}
@Test(expected = NullPointerException.class)
public void shouldNotAcceptNullReducerWhenReducingSessionWindows() {
-
groupedStream.windowedBy(SessionWindows.with(ofMillis(30))).reduce(null,
Materialized.<String, String, SessionStore<Bytes, byte[]>>as("store"));
+ groupedStream
+ .windowedBy(SessionWindows.with(ofMillis(30)))
+ .reduce(null, Materialized.as("store"));
}
@Test(expected = NullPointerException.class)
@@ -328,39 +286,51 @@ public class KGroupedStreamImplTest {
@Test(expected = InvalidTopicException.class)
public void shouldNotAcceptInvalidStoreNameWhenReducingSessionWindows() {
-
groupedStream.windowedBy(SessionWindows.with(ofMillis(30))).reduce(MockReducer.STRING_ADDER,
Materialized.<String, String, SessionStore<Bytes,
byte[]>>as(INVALID_STORE_NAME));
+ groupedStream
+ .windowedBy(SessionWindows.with(ofMillis(30)))
+ .reduce(MockReducer.STRING_ADDER,
Materialized.as(INVALID_STORE_NAME));
}
@Test(expected = NullPointerException.class)
public void
shouldNotAcceptNullStateStoreSupplierWhenReducingSessionWindows() {
-
groupedStream.windowedBy(SessionWindows.with(ofMillis(30))).reduce(null,
Materialized.<String, String, SessionStore<Bytes, byte[]>>as(null));
+ groupedStream
+ .windowedBy(SessionWindows.with(ofMillis(30)))
+ .reduce(
+ null,
+ Materialized.<String, String, SessionStore<Bytes,
byte[]>>as(null));
}
@Test(expected = NullPointerException.class)
public void shouldNotAcceptNullInitializerWhenAggregatingSessionWindows() {
-
groupedStream.windowedBy(SessionWindows.with(ofMillis(30))).aggregate(null,
MockAggregator.TOSTRING_ADDER, new Merger<String, String>() {
- @Override
- public String apply(final String aggKey, final String aggOne,
final String aggTwo) {
- return null;
- }
- }, Materialized.<String, String, SessionStore<Bytes,
byte[]>>as("storeName"));
+ groupedStream
+ .windowedBy(SessionWindows.with(ofMillis(30)))
+ .aggregate(
+ null,
+ MockAggregator.TOSTRING_ADDER,
+ (aggKey, aggOne, aggTwo) -> null,
+ Materialized.as("storeName"));
}
@Test(expected = NullPointerException.class)
public void shouldNotAcceptNullAggregatorWhenAggregatingSessionWindows() {
-
groupedStream.windowedBy(SessionWindows.with(ofMillis(30))).aggregate(MockInitializer.STRING_INIT,
null, new Merger<String, String>() {
- @Override
- public String apply(final String aggKey, final String aggOne,
final String aggTwo) {
- return null;
- }
- }, Materialized.<String, String, SessionStore<Bytes,
byte[]>>as("storeName"));
+ groupedStream.
+ windowedBy(SessionWindows.with(ofMillis(30)))
+ .aggregate(
+ MockInitializer.STRING_INIT,
+ null,
+ (aggKey, aggOne, aggTwo) -> null,
+ Materialized.as("storeName"));
}
@Test(expected = NullPointerException.class)
public void
shouldNotAcceptNullSessionMergerWhenAggregatingSessionWindows() {
-
groupedStream.windowedBy(SessionWindows.with(ofMillis(30))).aggregate(MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER,
+ groupedStream
+ .windowedBy(SessionWindows.with(ofMillis(30)))
+ .aggregate(
+ MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
null,
- Materialized.<String, String, SessionStore<Bytes,
byte[]>>as("storeName"));
+ Materialized.as("storeName"));
}
@Test(expected = NullPointerException.class)
@@ -370,24 +340,24 @@ public class KGroupedStreamImplTest {
@Test
public void shouldAcceptNullStoreNameWhenAggregatingSessionWindows() {
- groupedStream.windowedBy(SessionWindows.with(ofMillis(10)))
- .aggregate(MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER, new Merger<String, String>() {
- @Override
- public String apply(final String aggKey, final String
aggOne, final String aggTwo) {
- return null;
- }
- }, Materialized.<String, String, SessionStore<Bytes,
byte[]>>with(Serdes.String(), Serdes.String()));
+ groupedStream
+ .windowedBy(SessionWindows.with(ofMillis(10)))
+ .aggregate(
+ MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ (aggKey, aggOne, aggTwo) -> null,
+ Materialized.with(Serdes.String(), Serdes.String()));
}
@Test(expected = InvalidTopicException.class)
public void shouldNotAcceptInvalidStoreNameWhenAggregatingSessionWindows()
{
- groupedStream.windowedBy(SessionWindows.with(ofMillis(10)))
- .aggregate(MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER, new Merger<String, String>() {
- @Override
- public String apply(final String aggKey, final String
aggOne, final String aggTwo) {
- return null;
- }
- }, Materialized.<String, String, SessionStore<Bytes,
byte[]>>as(INVALID_STORE_NAME));
+ groupedStream
+ .windowedBy(SessionWindows.with(ofMillis(10)))
+ .aggregate(
+ MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ (aggKey, aggOne, aggTwo) -> null,
+ Materialized.as(INVALID_STORE_NAME));
}
@SuppressWarnings("unchecked")
@@ -506,16 +476,10 @@ public class KGroupedStreamImplTest {
@Test
public void shouldAggregateWithDefaultSerdes() {
final Map<String, String> results = new HashMap<>();
- groupedStream.aggregate(
- MockInitializer.STRING_INIT,
- MockAggregator.TOSTRING_ADDER)
+ groupedStream
+ .aggregate(MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER)
.toStream()
- .foreach(new ForeachAction<String, String>() {
- @Override
- public void apply(final String key, final String value) {
- results.put(key, value);
- }
- });
+ .foreach(results::put);
try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), props)) {
processData(driver);
@@ -560,14 +524,11 @@ public class KGroupedStreamImplTest {
@Test
public void shouldCountWindowed() {
final List<KeyValue<Windowed<String>, Long>> results = new
ArrayList<>();
-
groupedStream.windowedBy(TimeWindows.of(ofMillis(500L))).count(Materialized.<String,
Long, WindowStore<Bytes, byte[]>>as("aggregate-by-key-windowed"))
+ groupedStream
+ .windowedBy(TimeWindows.of(ofMillis(500L)))
+ .count(Materialized.as("aggregate-by-key-windowed"))
.toStream()
- .foreach(new ForeachAction<Windowed<String>, Long>() {
- @Override
- public void apply(final Windowed<String> key, final Long
value) {
- results.add(KeyValue.pair(key, value));
- }
- });
+ .foreach((key, value) -> results.add(KeyValue.pair(key, value)));
doCountWindowed(results);
}
@@ -575,14 +536,11 @@ public class KGroupedStreamImplTest {
@Test
public void shouldCountWindowedWithInternalStoreName() {
final List<KeyValue<Windowed<String>, Long>> results = new
ArrayList<>();
- groupedStream.windowedBy(TimeWindows.of(ofMillis(500L))).count()
+ groupedStream
+ .windowedBy(TimeWindows.of(ofMillis(500L)))
+ .count()
.toStream()
- .foreach(new ForeachAction<Windowed<String>, Long>() {
- @Override
- public void apply(final Windowed<String> key, final Long
value) {
- results.add(KeyValue.pair(key, value));
- }
- });
+ .foreach((key, value) -> results.add(KeyValue.pair(key, value)));
doCountWindowed(results);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
index 99f1b81..09f93e7 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
@@ -21,11 +21,10 @@ import
org.apache.kafka.common.serialization.DoubleSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
-import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KGroupedTable;
import org.apache.kafka.streams.kstream.KTable;
@@ -50,7 +49,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
-
public class KGroupedTableImplTest {
private final StreamsBuilder builder = new StreamsBuilder();
@@ -61,57 +59,84 @@ public class KGroupedTableImplTest {
@Before
public void before() {
- groupedTable = builder.table("blah", Consumed.with(Serdes.String(),
Serdes.String()))
- .groupBy(MockMapper.<String,
String>selectValueKeyValueMapper());
+ groupedTable = builder
+ .table("blah", Consumed.with(Serdes.String(), Serdes.String()))
+ .groupBy(MockMapper.selectValueKeyValueMapper());
}
@Test(expected = InvalidTopicException.class)
public void shouldNotAllowInvalidStoreNameOnAggregate() {
- groupedTable.aggregate(MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER,
Materialized.<String, String, KeyValueStore<Bytes,
byte[]>>as(INVALID_STORE_NAME));
+ groupedTable.aggregate(
+ MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ MockAggregator.TOSTRING_REMOVER,
+ Materialized.as(INVALID_STORE_NAME));
}
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullInitializerOnAggregate() {
- groupedTable.aggregate(null, MockAggregator.TOSTRING_ADDER,
MockAggregator.TOSTRING_REMOVER, Materialized.<String, String,
KeyValueStore<Bytes, byte[]>>as("store"));
+ groupedTable.aggregate(
+ null,
+ MockAggregator.TOSTRING_ADDER,
+ MockAggregator.TOSTRING_REMOVER,
+ Materialized.as("store"));
}
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullAdderOnAggregate() {
- groupedTable.aggregate(MockInitializer.STRING_INIT, null,
MockAggregator.TOSTRING_REMOVER, Materialized.<String, String,
KeyValueStore<Bytes, byte[]>>as("store"));
+ groupedTable.aggregate(
+ MockInitializer.STRING_INIT,
+ null,
+ MockAggregator.TOSTRING_REMOVER,
+ Materialized.as("store"));
}
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullSubtractorOnAggregate() {
- groupedTable.aggregate(MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER, null, Materialized.<String, String,
KeyValueStore<Bytes, byte[]>>as("store"));
+ groupedTable.aggregate(
+ MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ null,
+ Materialized.as("store"));
}
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullAdderOnReduce() {
- groupedTable.reduce(null, MockReducer.STRING_REMOVER,
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store"));
+ groupedTable.reduce(
+ null,
+ MockReducer.STRING_REMOVER,
+ Materialized.as("store"));
}
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullSubtractorOnReduce() {
- groupedTable.reduce(MockReducer.STRING_ADDER, null,
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store"));
+ groupedTable.reduce(
+ MockReducer.STRING_ADDER,
+ null,
+ Materialized.as("store"));
}
@Test(expected = InvalidTopicException.class)
public void shouldNotAllowInvalidStoreNameOnReduce() {
- groupedTable.reduce(MockReducer.STRING_ADDER,
MockReducer.STRING_REMOVER, Materialized.<String, String, KeyValueStore<Bytes,
byte[]>>as(INVALID_STORE_NAME));
+ groupedTable.reduce(
+ MockReducer.STRING_ADDER,
+ MockReducer.STRING_REMOVER,
+ Materialized.as(INVALID_STORE_NAME));
}
private Map<String, Integer> getReducedResults(final KTable<String,
Integer> inputKTable) {
final Map<String, Integer> reducedResults = new HashMap<>();
- inputKTable.toStream().foreach(new ForeachAction<String, Integer>() {
- @Override
- public void apply(final String key, final Integer value) {
- reducedResults.put(key, value);
- }
- });
+ inputKTable
+ .toStream()
+ .foreach(reducedResults::put);
return reducedResults;
}
- private void assertReduced(final Map<String, Integer> reducedResults,
final String topic, final TopologyTestDriver driver) {
- final ConsumerRecordFactory<String, Double> recordFactory = new
ConsumerRecordFactory<>(new StringSerializer(), new DoubleSerializer());
+
+ private void assertReduced(final Map<String, Integer> reducedResults,
+ final String topic,
+ final TopologyTestDriver driver) {
+ final ConsumerRecordFactory<String, Double> recordFactory =
+ new ConsumerRecordFactory<>(new StringSerializer(), new
DoubleSerializer());
driver.pipeInput(recordFactory.create(topic, "A", 1.1, 10));
driver.pipeInput(recordFactory.create(topic, "B", 2.2, 10));
@@ -130,20 +155,20 @@ public class KGroupedTableImplTest {
@Test
public void shouldReduce() {
final KeyValueMapper<String, Number, KeyValue<String, Integer>>
intProjection =
- new KeyValueMapper<String, Number, KeyValue<String, Integer>>() {
- @Override
- public KeyValue<String, Integer> apply(final String key, final
Number value) {
- return KeyValue.pair(key, value.intValue());
- }
- };
-
- final KTable<String, Integer> reduced = builder.table(topic,
-
Consumed.with(Serdes.String(), Serdes.Double()),
-
Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as("store")
-
.withKeySerde(Serdes.String())
-
.withValueSerde(Serdes.Double()))
+ (key, value) -> KeyValue.pair(key, value.intValue());
+
+ final KTable<String, Integer> reduced = builder
+ .table(
+ topic,
+ Consumed.with(Serdes.String(), Serdes.Double()),
+ Materialized.<String, Double, KeyValueStore<Bytes,
byte[]>>as("store")
+ .withKeySerde(Serdes.String())
+ .withValueSerde(Serdes.Double()))
.groupBy(intProjection)
- .reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR,
Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("reduced"));
+ .reduce(
+ MockReducer.INTEGER_ADDER,
+ MockReducer.INTEGER_SUBTRACTOR,
+ Materialized.as("reduced"));
final Map<String, Integer> results = getReducedResults(reduced);
try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), props)) {
@@ -155,18 +180,15 @@ public class KGroupedTableImplTest {
@Test
public void shouldReduceWithInternalStoreName() {
final KeyValueMapper<String, Number, KeyValue<String, Integer>>
intProjection =
- new KeyValueMapper<String, Number, KeyValue<String, Integer>>() {
- @Override
- public KeyValue<String, Integer> apply(final String key, final
Number value) {
- return KeyValue.pair(key, value.intValue());
- }
- };
-
- final KTable<String, Integer> reduced = builder.table(topic,
-
Consumed.with(Serdes.String(), Serdes.Double()),
-
Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as("store")
-
.withKeySerde(Serdes.String())
-
.withValueSerde(Serdes.Double()))
+ (key, value) -> KeyValue.pair(key, value.intValue());
+
+ final KTable<String, Integer> reduced = builder
+ .table(
+ topic,
+ Consumed.with(Serdes.String(), Serdes.Double()),
+ Materialized.<String, Double, KeyValueStore<Bytes,
byte[]>>as("store")
+ .withKeySerde(Serdes.String())
+ .withValueSerde(Serdes.Double()))
.groupBy(intProjection)
.reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR);
@@ -181,20 +203,19 @@ public class KGroupedTableImplTest {
@Test
public void shouldReduceAndMaterializeResults() {
final KeyValueMapper<String, Number, KeyValue<String, Integer>>
intProjection =
- new KeyValueMapper<String, Number, KeyValue<String, Integer>>() {
- @Override
- public KeyValue<String, Integer> apply(final String key, final
Number value) {
- return KeyValue.pair(key, value.intValue());
- }
- };
-
- final KTable<String, Integer> reduced = builder.table(topic,
Consumed.with(Serdes.String(), Serdes.Double()))
- .groupBy(intProjection)
- .reduce(MockReducer.INTEGER_ADDER,
- MockReducer.INTEGER_SUBTRACTOR,
- Materialized.<String, Integer, KeyValueStore<Bytes,
byte[]>>as("reduce")
- .withKeySerde(Serdes.String())
- .withValueSerde(Serdes.Integer()));
+ (key, value) -> KeyValue.pair(key, value.intValue());
+
+ final KTable<String, Integer> reduced = builder
+ .table(
+ topic,
+ Consumed.with(Serdes.String(), Serdes.Double()))
+ .groupBy(intProjection)
+ .reduce(
+ MockReducer.INTEGER_ADDER,
+ MockReducer.INTEGER_SUBTRACTOR,
+ Materialized.<String, Integer, KeyValueStore<Bytes,
byte[]>>as("reduce")
+ .withKeySerde(Serdes.String())
+ .withValueSerde(Serdes.Integer()));
final Map<String, Integer> results = getReducedResults(reduced);
try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), props)) {
@@ -208,11 +229,17 @@ public class KGroupedTableImplTest {
@SuppressWarnings("unchecked")
@Test
public void shouldCountAndMaterializeResults() {
- final KTable<String, String> table = builder.table(topic,
Consumed.with(Serdes.String(), Serdes.String()));
- table.groupBy(MockMapper.selectValueKeyValueMapper(),
Grouped.with(Serdes.String(), Serdes.String()))
- .count(Materialized.<String, Long, KeyValueStore<Bytes,
byte[]>>as("count")
- .withKeySerde(Serdes.String())
- .withValueSerde(Serdes.Long()));
+ builder
+ .table(
+ topic,
+ Consumed.with(Serdes.String(), Serdes.String()))
+ .groupBy(
+ MockMapper.selectValueKeyValueMapper(),
+ Grouped.with(Serdes.String(), Serdes.String()))
+ .count(
+ Materialized.<String, Long, KeyValueStore<Bytes,
byte[]>>as("count")
+ .withKeySerde(Serdes.String())
+ .withValueSerde(Serdes.Long()));
try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), props)) {
processData(topic, driver);
@@ -225,14 +252,20 @@ public class KGroupedTableImplTest {
@SuppressWarnings("unchecked")
@Test
public void shouldAggregateAndMaterializeResults() {
- final KTable<String, String> table = builder.table(topic,
Consumed.with(Serdes.String(), Serdes.String()));
- table.groupBy(MockMapper.<String, String>selectValueKeyValueMapper(),
Grouped.with(Serdes.String(), Serdes.String()))
- .aggregate(MockInitializer.STRING_INIT,
- MockAggregator.TOSTRING_ADDER,
- MockAggregator.TOSTRING_REMOVER,
- Materialized.<String, String, KeyValueStore<Bytes,
byte[]>>as("aggregate")
- .withValueSerde(Serdes.String())
- .withKeySerde(Serdes.String()));
+ builder
+ .table(
+ topic,
+ Consumed.with(Serdes.String(), Serdes.String()))
+ .groupBy(
+ MockMapper.selectValueKeyValueMapper(),
+ Grouped.with(Serdes.String(), Serdes.String()))
+ .aggregate(
+ MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ MockAggregator.TOSTRING_REMOVER,
+ Materialized.<String, String, KeyValueStore<Bytes,
byte[]>>as("aggregate")
+ .withValueSerde(Serdes.String())
+ .withKeySerde(Serdes.String()));
try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), props)) {
processData(topic, driver);
@@ -251,54 +284,69 @@ public class KGroupedTableImplTest {
@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnReduceWhenMaterializedIsNull() {
- groupedTable.reduce(MockReducer.STRING_ADDER,
MockReducer.STRING_REMOVER, (Materialized) null);
+ groupedTable.reduce(
+ MockReducer.STRING_ADDER,
+ MockReducer.STRING_REMOVER,
+ (Materialized) null);
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnReduceWhenAdderIsNull() {
- groupedTable.reduce(null, MockReducer.STRING_REMOVER,
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store"));
+ groupedTable.reduce(
+ null,
+ MockReducer.STRING_REMOVER,
+ Materialized.as("store"));
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnReduceWhenSubtractorIsNull() {
- groupedTable.reduce(MockReducer.STRING_ADDER, null,
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store"));
+ groupedTable.reduce(
+ MockReducer.STRING_ADDER,
+ null,
+ Materialized.as("store"));
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnAggregateWhenInitializerIsNull() {
- groupedTable.aggregate(null,
- MockAggregator.TOSTRING_ADDER,
- MockAggregator.TOSTRING_REMOVER,
- Materialized.<String, String,
KeyValueStore<Bytes, byte[]>>as("store"));
+ groupedTable.aggregate(
+ null,
+ MockAggregator.TOSTRING_ADDER,
+ MockAggregator.TOSTRING_REMOVER,
+ Materialized.as("store"));
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnAggregateWhenAdderIsNull() {
- groupedTable.aggregate(MockInitializer.STRING_INIT,
- null,
- MockAggregator.TOSTRING_REMOVER,
- Materialized.<String, String,
KeyValueStore<Bytes, byte[]>>as("store"));
+ groupedTable.aggregate(
+ MockInitializer.STRING_INIT,
+ null,
+ MockAggregator.TOSTRING_REMOVER,
+ Materialized.as("store"));
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnAggregateWhenSubtractorIsNull() {
- groupedTable.aggregate(MockInitializer.STRING_INIT,
- MockAggregator.TOSTRING_ADDER,
- null,
- Materialized.<String, String,
KeyValueStore<Bytes, byte[]>>as("store"));
+ groupedTable.aggregate(
+ MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ null,
+ Materialized.as("store"));
}
@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnAggregateWhenMaterializedIsNull() {
- groupedTable.aggregate(MockInitializer.STRING_INIT,
- MockAggregator.TOSTRING_ADDER,
- MockAggregator.TOSTRING_REMOVER,
- (Materialized) null);
+ groupedTable.aggregate(
+ MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ MockAggregator.TOSTRING_REMOVER,
+ (Materialized) null);
}
- private void processData(final String topic, final TopologyTestDriver
driver) {
- final ConsumerRecordFactory<String, String> recordFactory = new
ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+ private void processData(final String topic,
+ final TopologyTestDriver driver) {
+ final ConsumerRecordFactory<String, String> recordFactory =
+ new ConsumerRecordFactory<>(new StringSerializer(), new
StringSerializer());
driver.pipeInput(recordFactory.create(topic, "A", "1"));
driver.pipeInput(recordFactory.create(topic, "B", "1"));
driver.pipeInput(recordFactory.create(topic, "C", "1"));
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index f2e3cc9..133bd55 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -19,14 +19,13 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.ValueJoiner;
import
org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessor;
@@ -53,7 +52,8 @@ public class KStreamKStreamJoinTest {
final private String topic2 = "topic2";
private final Consumed<Integer, String> consumed =
Consumed.with(Serdes.Integer(), Serdes.String());
- private final ConsumerRecordFactory<Integer, String> recordFactory = new
ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
+ private final ConsumerRecordFactory<Integer, String> recordFactory =
+ new ConsumerRecordFactory<>(new IntegerSerializer(), new
StringSerializer());
private final Properties props =
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
@Test
@@ -62,16 +62,12 @@ public class KStreamKStreamJoinTest {
final KStream<String, Integer> left = builder.stream("left",
Consumed.with(Serdes.String(), Serdes.Integer()));
final KStream<String, Integer> right = builder.stream("right",
Consumed.with(Serdes.String(), Serdes.Integer()));
- final ConsumerRecordFactory<String, Integer> recordFactory = new
ConsumerRecordFactory<>(new StringSerializer(), new IntegerSerializer());
+ final ConsumerRecordFactory<String, Integer> recordFactory =
+ new ConsumerRecordFactory<>(new StringSerializer(), new
IntegerSerializer());
left.join(
right,
- new ValueJoiner<Integer, Integer, Integer>() {
- @Override
- public Integer apply(final Integer value1, final Integer
value2) {
- return value1 + value2;
- }
- },
+ (value1, value2) -> value1 + value2,
JoinWindows.of(ofMillis(100)),
Joined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer())
);
@@ -106,7 +102,8 @@ public class KStreamKStreamJoinTest {
Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
joined.process(supplier);
- final Collection<Set<String>> copartitionGroups =
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
+ final Collection<Set<String>> copartitionGroups =
+
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)),
copartitionGroups.iterator().next());
@@ -208,7 +205,8 @@ public class KStreamKStreamJoinTest {
JoinWindows.of(ofMillis(100)),
Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
joined.process(supplier);
- final Collection<Set<String>> copartitionGroups =
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
+ final Collection<Set<String>> copartitionGroups =
+
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)),
copartitionGroups.iterator().next());
@@ -313,7 +311,8 @@ public class KStreamKStreamJoinTest {
Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
joined.process(supplier);
- final Collection<Set<String>> copartitionGroups =
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
+ final Collection<Set<String>> copartitionGroups =
+
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)),
copartitionGroups.iterator().next());
@@ -536,7 +535,8 @@ public class KStreamKStreamJoinTest {
Serdes.String()));
joined.process(supplier);
- final Collection<Set<String>> copartitionGroups =
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
+ final Collection<Set<String>> copartitionGroups =
+
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)),
copartitionGroups.iterator().next());
@@ -645,7 +645,8 @@ public class KStreamKStreamJoinTest {
Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
joined.process(supplier);
- final Collection<Set<String>> copartitionGroups =
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
+ final Collection<Set<String>> copartitionGroups =
+
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)),
copartitionGroups.iterator().next());
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
index ced57b2..9906556 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
@@ -17,7 +17,6 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.easymock.EasyMock;
@@ -25,7 +24,7 @@ import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
-import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
@@ -34,23 +33,16 @@ import static org.junit.Assert.assertEquals;
public class KStreamPrintTest {
private ByteArrayOutputStream byteOutStream;
-
- private KeyValueMapper<Integer, String, String> mapper;
- private KStreamPrint kStreamPrint;
- private Processor printProcessor;
+ private Processor<Integer, String> printProcessor;
@Before
- public void setUp() throws Exception {
+ public void setUp() {
byteOutStream = new ByteArrayOutputStream();
- mapper = new KeyValueMapper<Integer, String, String>() {
- @Override
- public String apply(final Integer key, final String value) {
- return String.format("%d, %s", key, value);
- }
- };
-
- kStreamPrint = new KStreamPrint<>(new
PrintForeachAction<>(byteOutStream, mapper, "test-stream"));
+ final KStreamPrint<Integer, String> kStreamPrint = new
KStreamPrint<>(new PrintForeachAction<>(
+ byteOutStream,
+ (key, value) -> String.format("%d, %s", key, value),
+ "test-stream"));
printProcessor = kStreamPrint.get();
final ProcessorContext processorContext =
EasyMock.createNiceMock(ProcessorContext.class);
@@ -62,33 +54,27 @@ public class KStreamPrintTest {
@Test
@SuppressWarnings("unchecked")
public void testPrintStreamWithProvidedKeyValueMapper() {
-
final List<KeyValue<Integer, String>> inputRecords = Arrays.asList(
new KeyValue<>(0, "zero"),
new KeyValue<>(1, "one"),
new KeyValue<>(2, "two"),
new KeyValue<>(3, "three"));
- final String[] expectedResult = {"[test-stream]: 0, zero",
"[test-stream]: 1, one", "[test-stream]: 2, two", "[test-stream]: 3, three"};
+ final String[] expectedResult = {
+ "[test-stream]: 0, zero",
+ "[test-stream]: 1, one",
+ "[test-stream]: 2, two",
+ "[test-stream]: 3, three"};
- doTest(inputRecords, expectedResult);
- }
-
- private void assertFlushData(final String[] expectedResult, final
ByteArrayOutputStream byteOutStream) {
+ for (final KeyValue<Integer, String> record: inputRecords) {
+ printProcessor.process(record.key, record.value);
+ }
+ printProcessor.close();
- final String[] flushOutDatas = new String(byteOutStream.toByteArray(),
Charset.forName("UTF-8")).split("\\r*\\n");
+ final String[] flushOutDatas = new String(byteOutStream.toByteArray(),
StandardCharsets.UTF_8).split("\\r*\\n");
for (int i = 0; i < flushOutDatas.length; i++) {
assertEquals(expectedResult[i], flushOutDatas[i]);
}
}
- @SuppressWarnings("unchecked")
- private <K, V> void doTest(final List<KeyValue<K, V>> inputRecords, final
String[] expectedResult) {
-
- for (final KeyValue<K, V> record: inputRecords) {
- printProcessor.process(record.key, record.value);
- }
- printProcessor.close();
- assertFlushData(expectedResult, byteOutStream);
- }
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index 1074f02f..cd2031f 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -108,10 +108,12 @@ public class KStreamSessionWindowAggregateProcessorTest {
}
private void initStore(final boolean enableCaching) {
- final StoreBuilder<SessionStore<String, Long>> storeBuilder =
Stores.sessionStoreBuilder(Stores.persistentSessionStore(STORE_NAME,
ofMillis(GAP_MS * 3)),
-
Serdes.String(),
-
Serdes.Long())
-
.withLoggingDisabled();
+ final StoreBuilder<SessionStore<String, Long>> storeBuilder =
+ Stores.sessionStoreBuilder(
+ Stores.persistentSessionStore(STORE_NAME, ofMillis(GAP_MS *
3)),
+ Serdes.String(),
+ Serdes.Long())
+ .withLoggingDisabled();
if (enableCaching) {
storeBuilder.withCachingEnabled();
@@ -133,12 +135,12 @@ public class KStreamSessionWindowAggregateProcessorTest {
context.setTime(500);
processor.process("john", "second");
- final KeyValueIterator<Windowed<String>, Long> values =
sessionStore.findSessions("john", 0, 2000);
+ final KeyValueIterator<Windowed<String>, Long> values =
+ sessionStore.findSessions("john", 0, 2000);
assertTrue(values.hasNext());
assertEquals(Long.valueOf(2), values.next().value);
}
-
@Test
public void shouldMergeSessions() {
context.setTime(0);
@@ -156,7 +158,8 @@ public class KStreamSessionWindowAggregateProcessorTest {
context.setTime(GAP_MS / 2);
processor.process(sessionId, "third");
- final KeyValueIterator<Windowed<String>, Long> iterator =
sessionStore.findSessions(sessionId, 0, GAP_MS + 1);
+ final KeyValueIterator<Windowed<String>, Long> iterator =
+ sessionStore.findSessions(sessionId, 0, GAP_MS + 1);
final KeyValue<Windowed<String>, Long> kv = iterator.next();
assertEquals(Long.valueOf(3), kv.value);
@@ -168,7 +171,8 @@ public class KStreamSessionWindowAggregateProcessorTest {
context.setTime(0);
processor.process("mel", "first");
processor.process("mel", "second");
- final KeyValueIterator<Windowed<String>, Long> iterator =
sessionStore.findSessions("mel", 0, 0);
+ final KeyValueIterator<Windowed<String>, Long> iterator =
+ sessionStore.findSessions("mel", 0, 0);
assertEquals(Long.valueOf(2L), iterator.next().value);
assertFalse(iterator.hasNext());
}
@@ -199,21 +203,22 @@ public class KStreamSessionWindowAggregateProcessorTest {
}
-
@Test
public void shouldRemoveMergedSessionsFromStateStore() {
context.setTime(0);
processor.process("a", "1");
// first ensure it is in the store
- final KeyValueIterator<Windowed<String>, Long> a1 =
sessionStore.findSessions("a", 0, 0);
+ final KeyValueIterator<Windowed<String>, Long> a1 =
+ sessionStore.findSessions("a", 0, 0);
assertEquals(KeyValue.pair(new Windowed<>("a", new SessionWindow(0,
0)), 1L), a1.next());
context.setTime(100);
processor.process("a", "2");
// a1 from above should have been removed
// should have merged session in store
- final KeyValueIterator<Windowed<String>, Long> a2 =
sessionStore.findSessions("a", 0, 100);
+ final KeyValueIterator<Windowed<String>, Long> a2 =
+ sessionStore.findSessions("a", 0, 100);
assertEquals(KeyValue.pair(new Windowed<>("a", new SessionWindow(0,
100)), 2L), a2.next());
assertFalse(a2.hasNext());
}
@@ -250,7 +255,6 @@ public class KStreamSessionWindowAggregateProcessorTest {
);
}
-
@Test
public void shouldGetAggregatedValuesFromValueGetter() {
final KTableValueGetter<Windowed<String>, Long> getter =
sessionAggregator.view().get();
@@ -315,8 +319,12 @@ public class KStreamSessionWindowAggregateProcessorTest {
processor.process(null, "1");
LogCaptureAppender.unregister(appender);
- assertEquals(1.0, getMetricByName(context.metrics().metrics(),
"skipped-records-total", "stream-metrics").metricValue());
- assertThat(appender.getMessages(), hasItem("Skipping record due to
null key. value=[1] topic=[topic] partition=[-3] offset=[-2]"));
+ assertEquals(
+ 1.0,
+ getMetricByName(context.metrics().metrics(),
"skipped-records-total", "stream-metrics").metricValue());
+ assertThat(
+ appender.getMessages(),
+ hasItem("Skipping record due to null key. value=[1] topic=[topic]
partition=[-3] offset=[-2]"));
}
@Test
@@ -364,9 +372,14 @@ public class KStreamSessionWindowAggregateProcessorTest {
)
);
- assertThat((Double) metrics.metrics().get(dropRate).metricValue(),
greaterThan(0.0));
-
- assertThat(appender.getMessages(), hasItem("Skipping record for
expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[0]
window=[0,0) expiration=[10]"));
- assertThat(appender.getMessages(), hasItem("Skipping record for
expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[1]
window=[1,1) expiration=[10]"));
+ assertThat(
+ (Double) metrics.metrics().get(dropRate).metricValue(),
+ greaterThan(0.0));
+ assertThat(
+ appender.getMessages(),
+ hasItem("Skipping record for expired window. key=[A] topic=[topic]
partition=[-3] offset=[-2] timestamp=[0] window=[0,0) expiration=[10]"));
+ assertThat(
+ appender.getMessages(),
+ hasItem("Skipping record for expired window. key=[A] topic=[topic]
partition=[-3] offset=[-2] timestamp=[1] window=[1,1) expiration=[10]"));
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index dd39291..c8aef07 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -31,7 +31,6 @@ import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
@@ -71,17 +70,15 @@ public class KTableImplTest {
private final Consumed<String, String> consumed =
Consumed.with(Serdes.String(), Serdes.String());
private final Produced<String, String> produced =
Produced.with(Serdes.String(), Serdes.String());
private final Properties props =
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
- private final ConsumerRecordFactory<String, String> recordFactory = new
ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+ private final ConsumerRecordFactory<String, String> recordFactory =
+ new ConsumerRecordFactory<>(new StringSerializer(), new
StringSerializer());
+ private final Serde<String> mySerde = new Serdes.StringSerde();
- private StreamsBuilder builder;
private KTable<String, String> table;
- private Serde<String> mySerde = new Serdes.StringSerde();
-
@Before
public void setUp() {
- builder = new StreamsBuilder();
- table = builder.table("test");
+ table = new StreamsBuilder().table("test");
}
@Test
@@ -96,21 +93,11 @@ public class KTableImplTest {
final MockProcessorSupplier<String, Object> supplier = new
MockProcessorSupplier<>();
table1.toStream().process(supplier);
- final KTable<String, Integer> table2 = table1.mapValues(new
ValueMapper<String, Integer>() {
- @Override
- public Integer apply(final String value) {
- return new Integer(value);
- }
- });
+ final KTable<String, Integer> table2 = table1.mapValues(Integer::new);
table2.toStream().process(supplier);
- final KTable<String, Integer> table3 = table2.filter(new
Predicate<String, Integer>() {
- @Override
- public boolean test(final String key, final Integer value) {
- return (value % 2) == 0;
- }
- });
+ final KTable<String, Integer> table3 = table2.filter((key, value) ->
(value % 2) == 0);
table3.toStream().process(supplier);
@@ -142,63 +129,119 @@ public class KTableImplTest {
final KeyValueMapper<String, String, String> selector = (key, value)
-> key;
final ValueMapper<String, String> mapper = value -> value;
final ValueJoiner<String, String, String> joiner = (value1, value2) ->
value1;
- final ValueTransformerWithKeySupplier<String, String, String>
valueTransformerWithKeySupplier = () -> new ValueTransformerWithKey<String,
String, String>() {
- @Override
- public void init(final ProcessorContext context) {}
-
- @Override
- public String transform(final String key, final String value) {
- return value;
- }
-
- @Override
- public void close() {}
- };
-
- assertEquals(((AbstractStream) table1.filter((key, value) ->
false)).keySerde(), consumedInternal.keySerde());
- assertEquals(((AbstractStream) table1.filter((key, value) ->
false)).valueSerde(), consumedInternal.valueSerde());
- assertEquals(((AbstractStream) table1.filter((key, value) -> false,
Materialized.with(mySerde, mySerde))).keySerde(), mySerde);
- assertEquals(((AbstractStream) table1.filter((key, value) -> false,
Materialized.with(mySerde, mySerde))).valueSerde(), mySerde);
-
- assertEquals(((AbstractStream) table1.filterNot((key, value) ->
false)).keySerde(), consumedInternal.keySerde());
- assertEquals(((AbstractStream) table1.filterNot((key, value) ->
false)).valueSerde(), consumedInternal.valueSerde());
- assertEquals(((AbstractStream) table1.filterNot((key, value) -> false,
Materialized.with(mySerde, mySerde))).keySerde(), mySerde);
- assertEquals(((AbstractStream) table1.filterNot((key, value) -> false,
Materialized.with(mySerde, mySerde))).valueSerde(), mySerde);
+ final ValueTransformerWithKeySupplier<String, String, String>
valueTransformerWithKeySupplier =
+ () -> new ValueTransformerWithKey<String, String, String>() {
+ @Override
+ public void init(final ProcessorContext context) {}
+
+ @Override
+ public String transform(final String key, final String value) {
+ return value;
+ }
- assertEquals(((AbstractStream) table1.mapValues(mapper)).keySerde(),
consumedInternal.keySerde());
+ @Override
+ public void close() {}
+ };
+
+ assertEquals(
+ ((AbstractStream) table1.filter((key, value) -> false)).keySerde(),
+ consumedInternal.keySerde());
+ assertEquals(
+ ((AbstractStream) table1.filter((key, value) ->
false)).valueSerde(),
+ consumedInternal.valueSerde());
+ assertEquals(
+ ((AbstractStream) table1.filter((key, value) -> false,
Materialized.with(mySerde, mySerde))).keySerde(),
+ mySerde);
+ assertEquals(
+ ((AbstractStream) table1.filter((key, value) -> false,
Materialized.with(mySerde, mySerde))).valueSerde(),
+ mySerde);
+
+ assertEquals(
+ ((AbstractStream) table1.filterNot((key, value) ->
false)).keySerde(),
+ consumedInternal.keySerde());
+ assertEquals(
+ ((AbstractStream) table1.filterNot((key, value) ->
false)).valueSerde(),
+ consumedInternal.valueSerde());
+ assertEquals(
+ ((AbstractStream) table1.filterNot((key, value) -> false,
Materialized.with(mySerde, mySerde))).keySerde(),
+ mySerde);
+ assertEquals(
+ ((AbstractStream) table1.filterNot((key, value) -> false,
Materialized.with(mySerde, mySerde))).valueSerde(),
+ mySerde);
+
+ assertEquals(
+ ((AbstractStream) table1.mapValues(mapper)).keySerde(),
+ consumedInternal.keySerde());
assertNull(((AbstractStream) table1.mapValues(mapper)).valueSerde());
- assertEquals(((AbstractStream) table1.mapValues(mapper,
Materialized.with(mySerde, mySerde))).keySerde(), mySerde);
- assertEquals(((AbstractStream) table1.mapValues(mapper,
Materialized.with(mySerde, mySerde))).valueSerde(), mySerde);
-
- assertEquals(((AbstractStream) table1.toStream()).keySerde(),
consumedInternal.keySerde());
- assertEquals(((AbstractStream) table1.toStream()).valueSerde(),
consumedInternal.valueSerde());
+ assertEquals(
+ ((AbstractStream) table1.mapValues(mapper,
Materialized.with(mySerde, mySerde))).keySerde(),
+ mySerde);
+ assertEquals(
+ ((AbstractStream) table1.mapValues(mapper,
Materialized.with(mySerde, mySerde))).valueSerde(),
+ mySerde);
+
+ assertEquals(
+ ((AbstractStream) table1.toStream()).keySerde(),
+ consumedInternal.keySerde());
+ assertEquals(
+ ((AbstractStream) table1.toStream()).valueSerde(),
+ consumedInternal.valueSerde());
assertNull(((AbstractStream) table1.toStream(selector)).keySerde());
- assertEquals(((AbstractStream)
table1.toStream(selector)).valueSerde(), consumedInternal.valueSerde());
+ assertEquals(
+ ((AbstractStream) table1.toStream(selector)).valueSerde(),
+ consumedInternal.valueSerde());
- assertEquals(((AbstractStream)
table1.transformValues(valueTransformerWithKeySupplier)).keySerde(),
consumedInternal.keySerde());
+ assertEquals(
+ ((AbstractStream)
table1.transformValues(valueTransformerWithKeySupplier)).keySerde(),
+ consumedInternal.keySerde());
assertNull(((AbstractStream)
table1.transformValues(valueTransformerWithKeySupplier)).valueSerde());
- assertEquals(((AbstractStream)
table1.transformValues(valueTransformerWithKeySupplier,
Materialized.with(mySerde, mySerde))).keySerde(), mySerde);
- assertEquals(((AbstractStream)
table1.transformValues(valueTransformerWithKeySupplier,
Materialized.with(mySerde, mySerde))).valueSerde(), mySerde);
-
- assertEquals(((AbstractStream)
table1.groupBy(KeyValue::new)).keySerde(), null);
- assertEquals(((AbstractStream)
table1.groupBy(KeyValue::new)).valueSerde(), null);
- assertEquals(((AbstractStream) table1.groupBy(KeyValue::new,
Grouped.with(mySerde, mySerde))).keySerde(), mySerde);
- assertEquals(((AbstractStream) table1.groupBy(KeyValue::new,
Grouped.with(mySerde, mySerde))).valueSerde(), mySerde);
-
- assertEquals(((AbstractStream) table1.join(table1,
joiner)).keySerde(), consumedInternal.keySerde());
- assertEquals(((AbstractStream) table1.join(table1,
joiner)).valueSerde(), null);
- assertEquals(((AbstractStream) table1.join(table1, joiner,
Materialized.with(mySerde, mySerde))).keySerde(), mySerde);
- assertEquals(((AbstractStream) table1.join(table1, joiner,
Materialized.with(mySerde, mySerde))).valueSerde(), mySerde);
-
- assertEquals(((AbstractStream) table1.leftJoin(table1,
joiner)).keySerde(), consumedInternal.keySerde());
- assertEquals(((AbstractStream) table1.leftJoin(table1,
joiner)).valueSerde(), null);
- assertEquals(((AbstractStream) table1.leftJoin(table1, joiner,
Materialized.with(mySerde, mySerde))).keySerde(), mySerde);
- assertEquals(((AbstractStream) table1.leftJoin(table1, joiner,
Materialized.with(mySerde, mySerde))).valueSerde(), mySerde);
-
- assertEquals(((AbstractStream) table1.outerJoin(table1,
joiner)).keySerde(), consumedInternal.keySerde());
- assertEquals(((AbstractStream) table1.outerJoin(table1,
joiner)).valueSerde(), null);
- assertEquals(((AbstractStream) table1.outerJoin(table1, joiner,
Materialized.with(mySerde, mySerde))).keySerde(), mySerde);
- assertEquals(((AbstractStream) table1.outerJoin(table1, joiner,
Materialized.with(mySerde, mySerde))).valueSerde(), mySerde);
+ assertEquals(
+ ((AbstractStream)
table1.transformValues(valueTransformerWithKeySupplier,
Materialized.with(mySerde, mySerde))).keySerde(),
+ mySerde);
+ assertEquals(((AbstractStream)
table1.transformValues(valueTransformerWithKeySupplier,
Materialized.with(mySerde, mySerde))).valueSerde(),
+ mySerde);
+
+ assertNull(((AbstractStream)
table1.groupBy(KeyValue::new)).keySerde());
+ assertNull(((AbstractStream)
table1.groupBy(KeyValue::new)).valueSerde());
+ assertEquals(
+ ((AbstractStream) table1.groupBy(KeyValue::new,
Grouped.with(mySerde, mySerde))).keySerde(),
+ mySerde);
+ assertEquals(
+ ((AbstractStream) table1.groupBy(KeyValue::new,
Grouped.with(mySerde, mySerde))).valueSerde(),
+ mySerde);
+
+ assertEquals(
+ ((AbstractStream) table1.join(table1, joiner)).keySerde(),
+ consumedInternal.keySerde());
+ assertNull(((AbstractStream) table1.join(table1,
joiner)).valueSerde());
+ assertEquals(
+ ((AbstractStream) table1.join(table1, joiner,
Materialized.with(mySerde, mySerde))).keySerde(),
+ mySerde);
+ assertEquals(
+ ((AbstractStream) table1.join(table1, joiner,
Materialized.with(mySerde, mySerde))).valueSerde(),
+ mySerde);
+
+ assertEquals(
+ ((AbstractStream) table1.leftJoin(table1, joiner)).keySerde(),
+ consumedInternal.keySerde());
+ assertNull(((AbstractStream) table1.leftJoin(table1,
joiner)).valueSerde());
+ assertEquals(
+ ((AbstractStream) table1.leftJoin(table1, joiner,
Materialized.with(mySerde, mySerde))).keySerde(),
+ mySerde);
+ assertEquals(
+ ((AbstractStream) table1.leftJoin(table1, joiner,
Materialized.with(mySerde, mySerde))).valueSerde(),
+ mySerde);
+
+ assertEquals(
+ ((AbstractStream) table1.outerJoin(table1, joiner)).keySerde(),
+ consumedInternal.keySerde());
+ assertNull(((AbstractStream) table1.outerJoin(table1,
joiner)).valueSerde());
+ assertEquals(
+ ((AbstractStream) table1.outerJoin(table1, joiner,
Materialized.with(mySerde, mySerde))).keySerde(),
+ mySerde);
+ assertEquals(
+ ((AbstractStream) table1.outerJoin(table1, joiner,
Materialized.with(mySerde, mySerde))).valueSerde(),
+ mySerde);
}
@Test
@@ -209,23 +252,12 @@ public class KTableImplTest {
final StreamsBuilder builder = new StreamsBuilder();
final KTableImpl<String, String, String> table1 =
- (KTableImpl<String, String, String>) builder.table(topic1,
consumed);
+ (KTableImpl<String, String, String>) builder.table(topic1,
consumed);
builder.table(topic2, consumed);
- final KTableImpl<String, String, Integer> table1Mapped =
(KTableImpl<String, String, Integer>) table1.mapValues(
- new ValueMapper<String, Integer>() {
- @Override
- public Integer apply(final String value) {
- return new Integer(value);
- }
- });
- table1Mapped.filter(
- new Predicate<String, Integer>() {
- @Override
- public boolean test(final String key, final Integer value)
{
- return (value % 2) == 0;
- }
- });
+ final KTableImpl<String, String, Integer> table1Mapped =
+ (KTableImpl<String, String, Integer>)
table1.mapValues(Integer::new);
+ table1Mapped.filter((key, value) -> (value % 2) == 0);
try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), props)) {
assertEquals(0, driver.getAllStateStores().size());
@@ -240,31 +272,15 @@ public class KTableImplTest {
final StreamsBuilder builder = new StreamsBuilder();
final KTableImpl<String, String, String> table1 =
- (KTableImpl<String, String, String>) builder.table(topic1,
consumed);
+ (KTableImpl<String, String, String>) builder.table(topic1,
consumed);
final KTableImpl<String, String, String> table2 =
- (KTableImpl<String, String, String>) builder.table(topic2,
consumed);
-
- final KTableImpl<String, String, Integer> table1Mapped =
(KTableImpl<String, String, Integer>) table1.mapValues(
- new ValueMapper<String, Integer>() {
- @Override
- public Integer apply(final String value) {
- return new Integer(value);
- }
- });
- final KTableImpl<String, Integer, Integer> table1MappedFiltered =
(KTableImpl<String, Integer, Integer>) table1Mapped.filter(
- new Predicate<String, Integer>() {
- @Override
- public boolean test(final String key, final Integer value)
{
- return (value % 2) == 0;
- }
- });
- table2.join(table1MappedFiltered,
- new ValueJoiner<String, Integer, String>() {
- @Override
- public String apply(final String v1, final Integer v2) {
- return v1 + v2;
- }
- });
+ (KTableImpl<String, String, String>) builder.table(topic2,
consumed);
+
+ final KTableImpl<String, String, Integer> table1Mapped =
+ (KTableImpl<String, String, Integer>)
table1.mapValues(Integer::new);
+ final KTableImpl<String, Integer, Integer> table1MappedFiltered =
+ (KTableImpl<String, Integer, Integer>) table1Mapped.filter((key,
value) -> (value % 2) == 0);
+ table2.join(table1MappedFiltered, (v1, v2) -> v1 + v2);
try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), props)) {
assertEquals(2, driver.getAllStateStores().size());
@@ -280,30 +296,37 @@ public class KTableImplTest {
}
}
throw new AssertionError("No processor named '" + processorName + "'"
- + "found in the provided Topology:\n" + topology.describe());
+ + "found in the provided Topology:\n" + topology.describe());
}
@Test
- public void shouldCreateSourceAndSinkNodesForRepartitioningTopic() throws
NoSuchFieldException, IllegalAccessException {
+ public void shouldCreateSourceAndSinkNodesForRepartitioningTopic() throws
Exception {
final String topic1 = "topic1";
final String storeName1 = "storeName1";
final StreamsBuilder builder = new StreamsBuilder();
final KTableImpl<String, String, String> table1 =
- (KTableImpl<String, String, String>) builder.table(topic1,
- consumed,
-
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(storeName1)
-
.withKeySerde(Serdes.String())
-
.withValueSerde(Serdes.String())
- );
-
- table1.groupBy(MockMapper.<String, String>noOpKeyValueMapper())
- .aggregate(MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER,
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("mock-result1"));
-
-
- table1.groupBy(MockMapper.<String, String>noOpKeyValueMapper())
- .reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER,
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("mock-result2"));
+ (KTableImpl<String, String, String>) builder.table(
+ topic1,
+ consumed,
+ Materialized.<String, String, KeyValueStore<Bytes,
byte[]>>as(storeName1)
+ .withKeySerde(Serdes.String())
+ .withValueSerde(Serdes.String())
+ );
+
+ table1.groupBy(MockMapper.noOpKeyValueMapper())
+ .aggregate(
+ MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ MockAggregator.TOSTRING_REMOVER,
+ Materialized.as("mock-result1"));
+
+ table1.groupBy(MockMapper.noOpKeyValueMapper())
+ .reduce(
+ MockReducer.STRING_ADDER,
+ MockReducer.STRING_REMOVER,
+ Materialized.as("mock-result2"));
final Topology topology = builder.build();
try (final TopologyTestDriverWrapper driver = new
TopologyTestDriverWrapper(topology, props)) {
@@ -315,8 +338,12 @@ public class KTableImplTest {
assertTopologyContainsProcessor(topology,
"KSTREAM-SINK-0000000007");
assertTopologyContainsProcessor(topology,
"KSTREAM-SOURCE-0000000008");
- final Field valSerializerField = ((SinkNode)
driver.getProcessor("KSTREAM-SINK-0000000003")).getClass().getDeclaredField("valSerializer");
- final Field valDeserializerField = ((SourceNode)
driver.getProcessor("KSTREAM-SOURCE-0000000004")).getClass().getDeclaredField("valDeserializer");
+ final Field valSerializerField = ((SinkNode)
driver.getProcessor("KSTREAM-SINK-0000000003"))
+ .getClass()
+ .getDeclaredField("valSerializer");
+ final Field valDeserializerField = ((SourceNode)
driver.getProcessor("KSTREAM-SOURCE-0000000004"))
+ .getClass()
+ .getDeclaredField("valDeserializer");
valSerializerField.setAccessible(true);
valDeserializerField.setAccessible(true);
@@ -394,22 +421,12 @@ public class KTableImplTest {
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnFilterWhenMaterializedIsNull() {
- table.filter(new Predicate<String, String>() {
- @Override
- public boolean test(final String key, final String value) {
- return false;
- }
- }, (Materialized) null);
+ table.filter((key, value) -> false, (Materialized) null);
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnFilterNotWhenMaterializedIsNull() {
- table.filterNot(new Predicate<String, String>() {
- @Override
- public boolean test(final String key, final String value) {
- return false;
- }
- }, (Materialized) null);
+ table.filterNot((key, value) -> false, (Materialized) null);
}
@Test(expected = NullPointerException.class)
@@ -435,14 +452,16 @@ public class KTableImplTest {
@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void
shouldThrowNullPointerOnTransformValuesWithKeyWhenMaterializedIsNull() {
- final ValueTransformerWithKeySupplier<String, String, ?>
valueTransformerSupplier = mock(ValueTransformerWithKeySupplier.class);
+ final ValueTransformerWithKeySupplier<String, String, ?>
valueTransformerSupplier =
+ mock(ValueTransformerWithKeySupplier.class);
table.transformValues(valueTransformerSupplier, (Materialized) null);
}
@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void
shouldThrowNullPointerOnTransformValuesWithKeyWhenStoreNamesNull() {
- final ValueTransformerWithKeySupplier<String, String, ?>
valueTransformerSupplier = mock(ValueTransformerWithKeySupplier.class);
+ final ValueTransformerWithKeySupplier<String, String, ?>
valueTransformerSupplier =
+ mock(ValueTransformerWithKeySupplier.class);
table.transformValues(valueTransformerSupplier, (String[]) null);
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index 00791aa..b163915 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -47,10 +47,13 @@ import static org.junit.Assert.assertTrue;
public class KTableMapValuesTest {
private final Consumed<String, String> consumed =
Consumed.with(Serdes.String(), Serdes.String());
- private final ConsumerRecordFactory<String, String> recordFactory = new
ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+ private final ConsumerRecordFactory<String, String> recordFactory =
+ new ConsumerRecordFactory<>(new StringSerializer(), new
StringSerializer());
private final Properties props =
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
- private void doTestKTable(final StreamsBuilder builder, final String
topic1, final MockProcessorSupplier<String, Integer> supplier) {
+ private void doTestKTable(final StreamsBuilder builder,
+ final String topic1,
+ final MockProcessorSupplier<String, Integer>
supplier) {
try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), props)) {
driver.pipeInput(recordFactory.create(topic1, "A", "1"));
driver.pipeInput(recordFactory.create(topic1, "B", "2"));
@@ -82,7 +85,11 @@ public class KTableMapValuesTest {
final String topic1 = "topic1";
final KTable<String, String> table1 = builder.table(topic1, consumed);
- final KTable<String, Integer> table2 = table1.mapValues(value ->
value.charAt(0) - 48, Materialized.<String, Integer, KeyValueStore<Bytes,
byte[]>>as("anyName").withValueSerde(Serdes.Integer()));
+ final KTable<String, Integer> table2 = table1
+ .mapValues(
+ value -> value.charAt(0) - 48,
+ Materialized.<String, Integer, KeyValueStore<Bytes,
byte[]>>as("anyName")
+ .withValueSerde(Serdes.Integer()));
final MockProcessorSupplier<String, Integer> supplier = new
MockProcessorSupplier<>();
table2.toStream().process(supplier);
@@ -167,11 +174,15 @@ public class KTableMapValuesTest {
final KTableImpl<String, String, String> table1 =
(KTableImpl<String, String, String>) builder.table(topic1,
consumed);
final KTableImpl<String, String, Integer> table2 =
- (KTableImpl<String, String, Integer>)
table1.mapValues(Integer::new,
- Materialized.<String, Integer, KeyValueStore<Bytes,
byte[]>>as(storeName2).withValueSerde(Serdes.Integer()));
+ (KTableImpl<String, String, Integer>) table1.mapValues(
+ Integer::new,
+ Materialized.<String, Integer, KeyValueStore<Bytes,
byte[]>>as(storeName2)
+ .withValueSerde(Serdes.Integer()));
final KTableImpl<String, String, Integer> table3 =
- (KTableImpl<String, String, Integer>) table1.mapValues(value ->
new Integer(value) * (-1),
- Materialized.<String, Integer, KeyValueStore<Bytes,
byte[]>>as(storeName3).withValueSerde(Serdes.Integer()));
+ (KTableImpl<String, String, Integer>) table1.mapValues(
+ value -> new Integer(value) * (-1),
+ Materialized.<String, Integer, KeyValueStore<Bytes,
byte[]>>as(storeName3)
+ .withValueSerde(Serdes.Integer()));
final KTableImpl<String, String, Integer> table4 =
(KTableImpl<String, String, Integer>)
table1.mapValues(Integer::new);
@@ -189,8 +200,9 @@ public class KTableMapValuesTest {
final String topic1 = "topic1";
final KTableImpl<String, String, String> table1 =
- (KTableImpl<String, String, String>) builder.table(topic1,
consumed);
- final KTableImpl<String, String, Integer> table2 = (KTableImpl<String,
String, Integer>) table1.mapValues(Integer::new);
+ (KTableImpl<String, String, String>) builder.table(topic1,
consumed);
+ final KTableImpl<String, String, Integer> table2 =
+ (KTableImpl<String, String, Integer>)
table1.mapValues(Integer::new);
final MockProcessorSupplier<String, Integer> supplier = new
MockProcessorSupplier<>();
@@ -231,8 +243,9 @@ public class KTableMapValuesTest {
final String topic1 = "topic1";
final KTableImpl<String, String, String> table1 =
- (KTableImpl<String, String, String>) builder.table(topic1,
consumed);
- final KTableImpl<String, String, Integer> table2 = (KTableImpl<String,
String, Integer>) table1.mapValues(Integer::new);
+ (KTableImpl<String, String, String>) builder.table(topic1,
consumed);
+ final KTableImpl<String, String, Integer> table2 =
+ (KTableImpl<String, String, Integer>)
table1.mapValues(Integer::new);
table2.enableSendingOldValues();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
index 350b0d2..7da5077 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
@@ -62,8 +62,8 @@ import static org.easymock.EasyMock.verify;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.isA;
-import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
@RunWith(EasyMockRunner.class)
@@ -75,8 +75,8 @@ public class KTableTransformValuesTest {
private static final Consumed<String, String> CONSUMED =
Consumed.with(Serdes.String(), Serdes.String());
- private final ConsumerRecordFactory<String, String> recordFactory
- = new ConsumerRecordFactory<>(new StringSerializer(), new
StringSerializer());
+ private final ConsumerRecordFactory<String, String> recordFactory =
+ new ConsumerRecordFactory<>(new StringSerializer(), new
StringSerializer());
private TopologyTestDriver driver;
private MockProcessorSupplier<String, String> capture;
@@ -140,7 +140,8 @@ public class KTableTransformValuesTest {
@Test
public void
shouldInitializeTransformerWithForwardDisabledProcessorContext() {
final SingletonNoOpValueTransformer<String, String> transformer = new
SingletonNoOpValueTransformer<>();
- final KTableTransformValues<String, String, String> transformValues =
new KTableTransformValues<>(parent, transformer, null);
+ final KTableTransformValues<String, String, String> transformValues =
+ new KTableTransformValues<>(parent, transformer, null);
final Processor<String, Change<String>> processor =
transformValues.get();
processor.init(context);
@@ -329,7 +330,7 @@ public class KTableTransformValuesTest {
driver.pipeInput(recordFactory.create(INPUT_TOPIC, "D", (String) null,
0L));
assertThat(output(), hasItems("A:A->a!", "B:B->b!", "D:D->null!"));
- assertThat("Store should not be materialized",
driver.getKeyValueStore(QUERYABLE_NAME), is(nullValue()));
+ assertNull("Store should not be materialized",
driver.getKeyValueStore(QUERYABLE_NAME));
}
@Test
@@ -412,21 +413,11 @@ public class KTableTransformValuesTest {
}
private static KeyValueMapper<String, Integer, KeyValue<String, Integer>>
toForceSendingOfOldValues() {
- return new KeyValueMapper<String, Integer, KeyValue<String,
Integer>>() {
- @Override
- public KeyValue<String, Integer> apply(final String key, final
Integer value) {
- return new KeyValue<>(key, value);
- }
- };
+ return KeyValue::new;
}
private static ValueMapper<Integer, String> mapBackToStrings() {
- return new ValueMapper<Integer, String>() {
- @Override
- public String apply(final Integer value) {
- return value.toString();
- }
- };
+ return Object::toString;
}
private static StoreBuilder<KeyValueStore<Long, Long>> storeBuilder(final
String storeName) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
index ce0f25e..e0707ce 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
@@ -24,7 +24,6 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
@@ -55,28 +54,26 @@ public class TimeWindowedKStreamImplTest {
private static final String TOPIC = "input";
private final StreamsBuilder builder = new StreamsBuilder();
- private final ConsumerRecordFactory<String, String> recordFactory = new
ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+ private final ConsumerRecordFactory<String, String> recordFactory =
+ new ConsumerRecordFactory<>(new StringSerializer(), new
StringSerializer());
private final Properties props =
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
private TimeWindowedKStream<String, String> windowedStream;
@Before
public void before() {
final KStream<String, String> stream = builder.stream(TOPIC,
Consumed.with(Serdes.String(), Serdes.String()));
- windowedStream = stream.groupByKey(Grouped.with(Serdes.String(),
Serdes.String()))
- .windowedBy(TimeWindows.of(ofMillis(500L)));
+ windowedStream = stream.
+ groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+ .windowedBy(TimeWindows.of(ofMillis(500L)));
}
@Test
public void shouldCountWindowed() {
final Map<Windowed<String>, Long> results = new HashMap<>();
- windowedStream.count()
- .toStream()
- .foreach(new ForeachAction<Windowed<String>, Long>() {
- @Override
- public void apply(final Windowed<String> key, final Long
value) {
- results.put(key, value);
- }
- });
+ windowedStream
+ .count()
+ .toStream()
+ .foreach(results::put);
try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), props, 0L)) {
processData(driver);
@@ -90,14 +87,10 @@ public class TimeWindowedKStreamImplTest {
@Test
public void shouldReduceWindowed() {
final Map<Windowed<String>, String> results = new HashMap<>();
- windowedStream.reduce(MockReducer.STRING_ADDER)
- .toStream()
- .foreach(new ForeachAction<Windowed<String>, String>() {
- @Override
- public void apply(final Windowed<String> key, final String
value) {
- results.put(key, value);
- }
- });
+ windowedStream
+ .reduce(MockReducer.STRING_ADDER)
+ .toStream()
+ .foreach(results::put);
try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), props, 0L)) {
processData(driver);
@@ -110,17 +103,14 @@ public class TimeWindowedKStreamImplTest {
@Test
public void shouldAggregateWindowed() {
final Map<Windowed<String>, String> results = new HashMap<>();
- windowedStream.aggregate(MockInitializer.STRING_INIT,
+ windowedStream
+ .aggregate(
+ MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER,
- Materialized.<String, String, WindowStore<Bytes,
byte[]>>with(Serdes.String(), Serdes.String()
- ))
- .toStream()
- .foreach(new ForeachAction<Windowed<String>, String>() {
- @Override
- public void apply(final Windowed<String> key, final String
value) {
- results.put(key, value);
- }
- });
+ Materialized.with(Serdes.String(), Serdes.String()))
+ .toStream()
+ .foreach(results::put);
+
try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), props, 0L)) {
processData(driver);
}
@@ -131,14 +121,16 @@ public class TimeWindowedKStreamImplTest {
@Test
public void shouldMaterializeCount() {
- windowedStream.count(Materialized.<String, Long, WindowStore<Bytes,
byte[]>>as("count-store")
- .withKeySerde(Serdes.String())
- .withValueSerde(Serdes.Long()));
+ windowedStream.count(
+ Materialized.<String, Long, WindowStore<Bytes,
byte[]>>as("count-store")
+ .withKeySerde(Serdes.String())
+ .withValueSerde(Serdes.Long()));
try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), props, 0L)) {
processData(driver);
final WindowStore<String, Long> windowStore =
driver.getWindowStore("count-store");
- final List<KeyValue<Windowed<String>, Long>> data =
StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0),
ofEpochMilli(1000L)));
+ final List<KeyValue<Windowed<String>, Long>> data =
+ StreamsTestUtils.toList(windowStore.fetch("1", "2",
ofEpochMilli(0), ofEpochMilli(1000L)));
assertThat(data, equalTo(Arrays.asList(
KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)),
2L),
@@ -149,15 +141,17 @@ public class TimeWindowedKStreamImplTest {
@Test
public void shouldMaterializeReduced() {
- windowedStream.reduce(MockReducer.STRING_ADDER,
- Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("reduced")
- .withKeySerde(Serdes.String())
- .withValueSerde(Serdes.String()));
+ windowedStream.reduce(
+ MockReducer.STRING_ADDER,
+ Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("reduced")
+ .withKeySerde(Serdes.String())
+ .withValueSerde(Serdes.String()));
try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), props, 0L)) {
processData(driver);
final WindowStore<String, String> windowStore =
driver.getWindowStore("reduced");
- final List<KeyValue<Windowed<String>, String>> data =
StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0),
ofEpochMilli(1000L)));
+ final List<KeyValue<Windowed<String>, String>> data =
+ StreamsTestUtils.toList(windowStore.fetch("1", "2",
ofEpochMilli(0), ofEpochMilli(1000L)));
assertThat(data, equalTo(Arrays.asList(
KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)),
"1+2"),
@@ -168,16 +162,19 @@ public class TimeWindowedKStreamImplTest {
@Test
public void shouldMaterializeAggregated() {
- windowedStream.aggregate(MockInitializer.STRING_INIT,
- MockAggregator.TOSTRING_ADDER,
- Materialized.<String, String,
WindowStore<Bytes, byte[]>>as("aggregated")
- .withKeySerde(Serdes.String())
- .withValueSerde(Serdes.String()));
+ windowedStream.aggregate(
+ MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("aggregated")
+ .withKeySerde(Serdes.String())
+ .withValueSerde(Serdes.String()));
try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), props, 0L)) {
processData(driver);
final WindowStore<String, String> windowStore =
driver.getWindowStore("aggregated");
- final List<KeyValue<Windowed<String>, String>> data =
StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0),
ofEpochMilli(1000L)));
+ final List<KeyValue<Windowed<String>, String>> data =
+ StreamsTestUtils.toList(windowStore.fetch("1", "2",
ofEpochMilli(0), ofEpochMilli(1000L)));
+
assertThat(data, equalTo(Arrays.asList(
KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)),
"0+1+2"),
KeyValue.pair(new Windowed<>("1", new TimeWindow(500,
1000)), "0+3"),
@@ -202,36 +199,41 @@ public class TimeWindowedKStreamImplTest {
@Test(expected = NullPointerException.class)
public void
shouldThrowNullPointerOnMaterializedAggregateIfInitializerIsNull() {
- windowedStream.aggregate(null,
- MockAggregator.TOSTRING_ADDER,
- Materialized.<String, String,
WindowStore<Bytes, byte[]>>as("store"));
+ windowedStream.aggregate(
+ null,
+ MockAggregator.TOSTRING_ADDER,
+ Materialized.as("store"));
}
@Test(expected = NullPointerException.class)
public void
shouldThrowNullPointerOnMaterializedAggregateIfAggregatorIsNull() {
- windowedStream.aggregate(MockInitializer.STRING_INIT,
- null,
- Materialized.<String, String,
WindowStore<Bytes, byte[]>>as("store"));
+ windowedStream.aggregate(
+ MockInitializer.STRING_INIT,
+ null,
+ Materialized.as("store"));
}
@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void
shouldThrowNullPointerOnMaterializedAggregateIfMaterializedIsNull() {
- windowedStream.aggregate(MockInitializer.STRING_INIT,
- MockAggregator.TOSTRING_ADDER,
- (Materialized) null);
+ windowedStream.aggregate(
+ MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ (Materialized) null);
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnMaterializedReduceIfReducerIsNull() {
- windowedStream.reduce(null,
- Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("store"));
+ windowedStream.reduce(
+ null,
+ Materialized.as("store"));
}
@Test(expected = NullPointerException.class)
public void
shouldThrowNullPointerOnMaterializedReduceIfMaterializedIsNull() {
- windowedStream.reduce(MockReducer.STRING_ADDER,
- null);
+ windowedStream.reduce(
+ MockReducer.STRING_ADDER,
+ null);
}
@Test(expected = NullPointerException.class)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index 4fe3c07..be64523 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -431,8 +431,8 @@ public class SimpleBenchmark {
setStreamProperties("simple-benchmark-streams-with-store");
final StreamsBuilder builder = new StreamsBuilder();
- final StoreBuilder<KeyValueStore<Integer, byte[]>> storeBuilder
- =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("store"),
INTEGER_SERDE, BYTE_SERDE);
+ final StoreBuilder<KeyValueStore<Integer, byte[]>> storeBuilder =
+
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("store"),
INTEGER_SERDE, BYTE_SERDE);
builder.addStateStore(storeBuilder.withCachingEnabled());
final KStream<Integer, byte[]> source = builder.stream(topic);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
index b3afa24..a84216e 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
@@ -63,8 +63,8 @@ public class AbstractTaskTest {
private final TopicPartition storeTopicPartition2 = new
TopicPartition("t2", 0);
private final TopicPartition storeTopicPartition3 = new
TopicPartition("t3", 0);
private final TopicPartition storeTopicPartition4 = new
TopicPartition("t4", 0);
- private final Collection<TopicPartition> storeTopicPartitions
- = Utils.mkSet(storeTopicPartition1, storeTopicPartition2,
storeTopicPartition3, storeTopicPartition4);
+ private final Collection<TopicPartition> storeTopicPartitions =
+ Utils.mkSet(storeTopicPartition1, storeTopicPartition2,
storeTopicPartition3, storeTopicPartition4);
@Before
public void before() {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 183f9ae..475b5e9 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -187,8 +187,8 @@ public class ProcessorStateManagerTest {
@Test
public void testRegisterNonPersistentStore() throws IOException {
- final MockKeyValueStore nonPersistentStore
- = new MockKeyValueStore(nonPersistentStoreName, false); // non
persistent store
+ final MockKeyValueStore nonPersistentStore =
+ new MockKeyValueStore(nonPersistentStoreName, false); // non
persistent store
final ProcessorStateManager stateMgr = new ProcessorStateManager(
new TaskId(0, 2),
noPartitions,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java
index 00b440f..3ecf6ef 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java
@@ -107,10 +107,11 @@ public class CompositeReadOnlySessionStoreTest {
@Test(expected = InvalidStateStoreException.class)
public void shouldThrowInvalidStateStoreExceptionOnRebalance() {
- final CompositeReadOnlySessionStore<String, String> store
- = new CompositeReadOnlySessionStore<>(new
StateStoreProviderStub(true),
-
QueryableStoreTypes.<String, String>sessionStore(),
- "whateva");
+ final CompositeReadOnlySessionStore<String, String> store =
+ new CompositeReadOnlySessionStore<>(
+ new StateStoreProviderStub(true),
+ QueryableStoreTypes.<String, String>sessionStore(),
+ "whateva");
store.fetch("a");
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
index a80e28b..2a35342 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
@@ -82,8 +82,8 @@ public class RocksDBSessionStoreTest {
sessionStore.put(new Windowed<>(key, new SessionWindow(1500L, 2000L)),
1L);
sessionStore.put(new Windowed<>(key, new SessionWindow(2500L, 3000L)),
2L);
- final List<KeyValue<Windowed<String>, Long>> expected
- = Arrays.asList(KeyValue.pair(a1, 1L), KeyValue.pair(a2, 2L));
+ final List<KeyValue<Windowed<String>, Long>> expected =
+ Arrays.asList(KeyValue.pair(a1, 1L), KeyValue.pair(a2, 2L));
final KeyValueIterator<Windowed<String>, Long> values =
sessionStore.findSessions(key, 0, 1000L);
assertEquals(expected, toList(values));
diff --git
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index a90afe7..432b9f8 100644
---
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -315,8 +315,8 @@ public class TopologyTestDriver implements Closeable {
stateRestoreListener,
streamsConfig);
- final GlobalProcessorContextImpl globalProcessorContext
- = new GlobalProcessorContextImpl(streamsConfig,
globalStateManager, streamsMetrics, cache);
+ final GlobalProcessorContextImpl globalProcessorContext =
+ new GlobalProcessorContextImpl(streamsConfig,
globalStateManager, streamsMetrics, cache);
globalStateManager.setGlobalProcessorContext(globalProcessorContext);
globalStateTask = new GlobalStateUpdateTask(