This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 47796d2 MINOR: Fix deprecation warnings in
SlidingWindowedCogroupedKStreamImplTest (#10703)
47796d2 is described below
commit 47796d2f8781447c8e7de716841db438cde9cc3c
Author: Ismael Juma <[email protected]>
AuthorDate: Sat May 22 14:22:42 2021 -0700
MINOR: Fix deprecation warnings in SlidingWindowedCogroupedKStreamImplTest
(#10703)
Reviewers: Matthias J. Sax <[email protected]>
---
.../internals/SlidingWindowedCogroupedKStreamImplTest.java | 12 +++++++-----
1 file changed, 7 insertions(+), 5 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java
index 6be96b9..5a06341 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java
@@ -59,6 +59,7 @@ public class SlidingWindowedCogroupedKStreamImplTest {
private static final String TOPIC = "topic";
private static final String TOPIC2 = "topic2";
private static final String OUTPUT = "output";
+ private static final long WINDOW_SIZE_MS = 500L;
private final StreamsBuilder builder = new StreamsBuilder();
private KGroupedStream<String, String> groupedStream;
@@ -80,7 +81,8 @@ public class SlidingWindowedCogroupedKStreamImplTest {
groupedStream2 = stream2.groupByKey(Grouped.with(Serdes.String(),
Serdes.String()));
cogroupedStream = groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
.cogroup(groupedStream2, MockAggregator.TOSTRING_REMOVER);
- windowedCogroupedStream =
cogroupedStream.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(500L),
ofMillis(2000L)));
+ windowedCogroupedStream =
cogroupedStream.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(
+ WINDOW_SIZE_MS), ofMillis(2000L)));
}
@Test
@@ -130,7 +132,7 @@ public class SlidingWindowedCogroupedKStreamImplTest {
.with(Serdes.String(), Serdes.String()));
groupedStream = stream.groupByKey(Grouped.with(Serdes.String(),
Serdes.String()));
groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
-
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(500L),
ofMillis(2000L)))
+
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(WINDOW_SIZE_MS),
ofMillis(2000L)))
.aggregate(MockInitializer.STRING_INIT, Named.as("foo"));
assertThat(builder.build().describe().toString(), equalTo(
@@ -156,7 +158,7 @@ public class SlidingWindowedCogroupedKStreamImplTest {
final TestInputTopic<String, String> testInputTopic =
driver.createInputTopic(
TOPIC, new StringSerializer(), new StringSerializer());
final TestOutputTopic<Windowed<String>, String> testOutputTopic =
driver.createOutputTopic(
- OUTPUT, new TimeWindowedDeserializer<>(new
StringDeserializer()), new StringDeserializer());
+ OUTPUT, new TimeWindowedDeserializer<>(new
StringDeserializer(), WINDOW_SIZE_MS), new StringDeserializer());
testInputTopic.pipeInput("k1", "A", 500);
testInputTopic.pipeInput("k2", "A", 500);
@@ -204,7 +206,7 @@ public class SlidingWindowedCogroupedKStreamImplTest {
public void slidingWindowAggregateOverlappingWindowsTest() {
final KTable<Windowed<String>, String> customers =
groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
-
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(500L),
ofMillis(2000L))).aggregate(
+
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(WINDOW_SIZE_MS),
ofMillis(2000L))).aggregate(
MockInitializer.STRING_INIT,
Materialized.with(Serdes.String(), Serdes.String()));
customers.toStream().to(OUTPUT);
@@ -212,7 +214,7 @@ public class SlidingWindowedCogroupedKStreamImplTest {
final TestInputTopic<String, String> testInputTopic =
driver.createInputTopic(
TOPIC, new StringSerializer(), new StringSerializer());
final TestOutputTopic<Windowed<String>, String> testOutputTopic =
driver.createOutputTopic(
- OUTPUT, new TimeWindowedDeserializer<>(new
StringDeserializer()), new StringDeserializer());
+ OUTPUT, new TimeWindowedDeserializer<>(new
StringDeserializer(), WINDOW_SIZE_MS), new StringDeserializer());
testInputTopic.pipeInput("k1", "A", 500);
testInputTopic.pipeInput("k2", "A", 500);