Repository: kafka Updated Branches: refs/heads/trunk a7312971a -> 845c6eae1
http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java index 785d3e8..be5c728 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java @@ -430,6 +430,32 @@ public class TopologyBuilder { } /** + * Connects a list of processors. + * + * NOTE this function would not needed by developers working with the processor APIs, but only used + * for the high-level DSL parsing functionalities. + * + * @param processorNames the name of the processors + * @return this builder instance so methods can be chained together; never null + */ + public final TopologyBuilder connectProcessors(String... processorNames) { + if (processorNames.length < 2) + throw new TopologyBuilderException("At least two processors need to participate in the connection."); + + for (String processorName : processorNames) { + if (!nodeFactories.containsKey(processorName)) + throw new TopologyBuilderException("Processor " + processorName + " is not added yet."); + + } + + String firstProcessorName = processorNames[0]; + + nodeGrouper.unite(firstProcessorName, Arrays.copyOfRange(processorNames, 1, processorNames.length)); + + return this; + } + + /** * Adds an internal topic * * @param topicName the name of the topic http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java index 0787204..62bf307 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java @@ -87,6 +87,14 @@ public class RecordQueue { timeTracker.addElement(stampedRecord); } + // update the partition timestamp if its currently + // tracked min timestamp has exceed its value; this will + // usually only take effect for the first added batch + long timestamp = timeTracker.get(); + + if (timestamp > partitionTime) + partitionTime = timestamp; + return size(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java index 440efc8..1cc5287 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java @@ -118,7 +118,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable if (configs.containsKey(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG)) { internalTopicManager = new InternalTopicManager( (String) configs.get(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG), - (Integer) configs.get(StreamsConfig.REPLICATION_FACTOR_CONFIG)); + configs.containsKey(StreamsConfig.REPLICATION_FACTOR_CONFIG) ? (Integer) configs.get(StreamsConfig.REPLICATION_FACTOR_CONFIG) : 1); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 7392d9e..e35eb89 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -279,7 +279,6 @@ public class StreamThread extends Thread { log.info("Shutting down stream thread [" + this.getName() + "]"); // Exceptions should not prevent this call from going through all shutdown steps - try { commitAll(); } catch (Throwable e) { http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java deleted file mode 100644 index 93c5df6..0000000 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java +++ /dev/null @@ -1,294 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.streams.kstream.Aggregator; -import org.apache.kafka.streams.kstream.HoppingWindows; -import org.apache.kafka.streams.kstream.Initializer; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KStreamBuilder; -import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.kstream.ValueJoiner; -import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.test.KStreamTestDriver; -import org.apache.kafka.test.MockProcessorSupplier; -import org.junit.Test; - -import java.io.File; -import java.nio.file.Files; - -import static org.junit.Assert.assertEquals; - -public class KStreamAggregateTest { - - private final Serializer<String> strSerializer = new StringSerializer(); - private final Deserializer<String> strDeserializer = new StringDeserializer(); - - private class StringAdd implements Aggregator<String, String, String> { - - @Override - public String apply(String aggKey, String value, String aggregate) { - return aggregate + "+" + value; - } - } - - private class StringInit implements Initializer<String> { - - @Override - public String apply() { - return "0"; - } - } - - @Test - public void testAggBasic() throws Exception { - final File baseDir = Files.createTempDirectory("test").toFile(); - - try { - final KStreamBuilder builder = new KStreamBuilder(); - String topic1 = "topic1"; - - KStream<String, String> stream1 = builder.stream(strDeserializer, strDeserializer, topic1); - KTable<Windowed<String>, String> table2 = stream1.aggregateByKey(new StringInit(), new StringAdd(), - HoppingWindows.of("topic1-Canonized").with(10L).every(5L), - strSerializer, - strSerializer, - strDeserializer, - strDeserializer); - - MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>(); - table2.toStream().process(proc2); - - KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir); - - driver.setTime(0L); - driver.process(topic1, "A", "1"); - driver.setTime(1L); - driver.process(topic1, "B", "2"); - driver.setTime(2L); - driver.process(topic1, "C", "3"); - driver.setTime(3L); - driver.process(topic1, "D", "4"); - driver.setTime(4L); - driver.process(topic1, "A", "1"); - - driver.setTime(5L); - driver.process(topic1, "A", "1"); - driver.setTime(6L); - driver.process(topic1, "B", "2"); - driver.setTime(7L); - driver.process(topic1, "D", "4"); - driver.setTime(8L); - driver.process(topic1, "B", "2"); - driver.setTime(9L); - driver.process(topic1, "C", "3"); - - driver.setTime(10L); - driver.process(topic1, "A", "1"); - driver.setTime(11L); - driver.process(topic1, "B", "2"); - driver.setTime(12L); - driver.process(topic1, "D", "4"); - driver.setTime(13L); - driver.process(topic1, "B", "2"); - driver.setTime(14L); - driver.process(topic1, "C", "3"); - - assertEquals(Utils.mkList( - "[A@0]:0+1", - "[B@0]:0+2", - "[C@0]:0+3", - "[D@0]:0+4", - "[A@0]:0+1+1", - - "[A@0]:0+1+1+1", "[A@5]:0+1", - "[B@0]:0+2+2", "[B@5]:0+2", - "[D@0]:0+4+4", "[D@5]:0+4", - "[B@0]:0+2+2+2", "[B@5]:0+2+2", - "[C@0]:0+3+3", "[C@5]:0+3", - - "[A@5]:0+1+1", "[A@10]:0+1", - "[B@5]:0+2+2+2", "[B@10]:0+2", - "[D@5]:0+4+4", "[D@10]:0+4", - "[B@5]:0+2+2+2+2", "[B@10]:0+2+2", - "[C@5]:0+3+3", "[C@10]:0+3"), proc2.processed); - - } finally { - Utils.delete(baseDir); - } - } - - @Test - public void testJoin() throws Exception { - final File baseDir = Files.createTempDirectory("test").toFile(); - - try { - final KStreamBuilder builder = new KStreamBuilder(); - String topic1 = "topic1"; - String topic2 = "topic2"; - - KStream<String, String> stream1 = builder.stream(strDeserializer, strDeserializer, topic1); - KTable<Windowed<String>, String> table1 = stream1.aggregateByKey(new StringInit(), new StringAdd(), - HoppingWindows.of("topic1-Canonized").with(10L).every(5L), - strSerializer, - strSerializer, - strDeserializer, - strDeserializer); - - MockProcessorSupplier<Windowed<String>, String> proc1 = new MockProcessorSupplier<>(); - table1.toStream().process(proc1); - - KStream<String, String> stream2 = builder.stream(strDeserializer, strDeserializer, topic2); - KTable<Windowed<String>, String> table2 = stream2.aggregateByKey(new StringInit(), new StringAdd(), - HoppingWindows.of("topic2-Canonized").with(10L).every(5L), - strSerializer, - strSerializer, - strDeserializer, - strDeserializer); - - MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>(); - table2.toStream().process(proc2); - - - MockProcessorSupplier<Windowed<String>, String> proc3 = new MockProcessorSupplier<>(); - table1.join(table2, new ValueJoiner<String, String, String>() { - @Override - public String apply(String p1, String p2) { - return p1 + "%" + p2; - } - }).toStream().process(proc3); - - KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir); - - driver.setTime(0L); - driver.process(topic1, "A", "1"); - driver.setTime(1L); - driver.process(topic1, "B", "2"); - driver.setTime(2L); - driver.process(topic1, "C", "3"); - driver.setTime(3L); - driver.process(topic1, "D", "4"); - driver.setTime(4L); - driver.process(topic1, "A", "1"); - - proc1.checkAndClearResult( - "[A@0]:0+1", - "[B@0]:0+2", - "[C@0]:0+3", - "[D@0]:0+4", - "[A@0]:0+1+1" - ); - proc2.checkAndClearResult(); - proc3.checkAndClearResult( - "[A@0]:null", - "[B@0]:null", - "[C@0]:null", - "[D@0]:null", - "[A@0]:null" - ); - - driver.setTime(5L); - driver.process(topic1, "A", "1"); - driver.setTime(6L); - driver.process(topic1, "B", "2"); - driver.setTime(7L); - driver.process(topic1, "D", "4"); - driver.setTime(8L); - driver.process(topic1, "B", "2"); - driver.setTime(9L); - driver.process(topic1, "C", "3"); - - proc1.checkAndClearResult( - "[A@0]:0+1+1+1", "[A@5]:0+1", - "[B@0]:0+2+2", "[B@5]:0+2", - "[D@0]:0+4+4", "[D@5]:0+4", - "[B@0]:0+2+2+2", "[B@5]:0+2+2", - "[C@0]:0+3+3", "[C@5]:0+3" - ); - proc2.checkAndClearResult(); - proc3.checkAndClearResult( - "[A@0]:null", "[A@5]:null", - "[B@0]:null", "[B@5]:null", - "[D@0]:null", "[D@5]:null", - "[B@0]:null", "[B@5]:null", - "[C@0]:null", "[C@5]:null" - ); - - driver.setTime(0L); - driver.process(topic2, "A", "a"); - driver.setTime(1L); - driver.process(topic2, "B", "b"); - driver.setTime(2L); - driver.process(topic2, "C", "c"); - driver.setTime(3L); - driver.process(topic2, "D", "d"); - driver.setTime(4L); - driver.process(topic2, "A", "a"); - - proc1.checkAndClearResult(); - proc2.checkAndClearResult( - "[A@0]:0+a", - "[B@0]:0+b", - "[C@0]:0+c", - "[D@0]:0+d", - "[A@0]:0+a+a" - ); - proc3.checkAndClearResult( - "[A@0]:0+1+1+1%0+a", - "[B@0]:0+2+2+2%0+b", - "[C@0]:0+3+3%0+c", - "[D@0]:0+4+4%0+d", - "[A@0]:0+1+1+1%0+a+a"); - - driver.setTime(5L); - driver.process(topic2, "A", "a"); - driver.setTime(6L); - driver.process(topic2, "B", "b"); - driver.setTime(7L); - driver.process(topic2, "D", "d"); - driver.setTime(8L); - driver.process(topic2, "B", "b"); - driver.setTime(9L); - driver.process(topic2, "C", "c"); - - proc1.checkAndClearResult(); - proc2.checkAndClearResult( - "[A@0]:0+a+a+a", "[A@5]:0+a", - "[B@0]:0+b+b", "[B@5]:0+b", - "[D@0]:0+d+d", "[D@5]:0+d", - "[B@0]:0+b+b+b", "[B@5]:0+b+b", - "[C@0]:0+c+c", "[C@5]:0+c" - ); - proc3.checkAndClearResult( - "[A@0]:0+1+1+1%0+a+a+a", "[A@5]:0+1%0+a", - "[B@0]:0+2+2+2%0+b+b", "[B@5]:0+2+2%0+b", - "[D@0]:0+4+4%0+d+d", "[D@5]:0+4%0+d", - "[B@0]:0+2+2+2%0+b+b+b", "[B@5]:0+2+2%0+b+b", - "[C@0]:0+3+3%0+c+c", "[C@5]:0+3%0+c" - ); - - } finally { - Utils.delete(baseDir); - } - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java new file mode 100644 index 0000000..9e0745a --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java @@ -0,0 +1,294 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.HoppingWindows; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.ValueJoiner; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.test.MockProcessorSupplier; +import org.junit.Test; + +import java.io.File; +import java.nio.file.Files; + +import static org.junit.Assert.assertEquals; + +public class KStreamWindowAggregateTest { + + private final Serializer<String> strSerializer = new StringSerializer(); + private final Deserializer<String> strDeserializer = new StringDeserializer(); + + private class StringAdd implements Aggregator<String, String, String> { + + @Override + public String apply(String aggKey, String value, String aggregate) { + return aggregate + "+" + value; + } + } + + private class StringInit implements Initializer<String> { + + @Override + public String apply() { + return "0"; + } + } + + @Test + public void testAggBasic() throws Exception { + final File baseDir = Files.createTempDirectory("test").toFile(); + + try { + final KStreamBuilder builder = new KStreamBuilder(); + String topic1 = "topic1"; + + KStream<String, String> stream1 = builder.stream(strDeserializer, strDeserializer, topic1); + KTable<Windowed<String>, String> table2 = stream1.aggregateByKey(new StringInit(), new StringAdd(), + HoppingWindows.of("topic1-Canonized").with(10L).every(5L), + strSerializer, + strSerializer, + strDeserializer, + strDeserializer); + + MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>(); + table2.toStream().process(proc2); + + KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir); + + driver.setTime(0L); + driver.process(topic1, "A", "1"); + driver.setTime(1L); + driver.process(topic1, "B", "2"); + driver.setTime(2L); + driver.process(topic1, "C", "3"); + driver.setTime(3L); + driver.process(topic1, "D", "4"); + driver.setTime(4L); + driver.process(topic1, "A", "1"); + + driver.setTime(5L); + driver.process(topic1, "A", "1"); + driver.setTime(6L); + driver.process(topic1, "B", "2"); + driver.setTime(7L); + driver.process(topic1, "D", "4"); + driver.setTime(8L); + driver.process(topic1, "B", "2"); + driver.setTime(9L); + driver.process(topic1, "C", "3"); + + driver.setTime(10L); + driver.process(topic1, "A", "1"); + driver.setTime(11L); + driver.process(topic1, "B", "2"); + driver.setTime(12L); + driver.process(topic1, "D", "4"); + driver.setTime(13L); + driver.process(topic1, "B", "2"); + driver.setTime(14L); + driver.process(topic1, "C", "3"); + + assertEquals(Utils.mkList( + "[A@0]:0+1", + "[B@0]:0+2", + "[C@0]:0+3", + "[D@0]:0+4", + "[A@0]:0+1+1", + + "[A@0]:0+1+1+1", "[A@5]:0+1", + "[B@0]:0+2+2", "[B@5]:0+2", + "[D@0]:0+4+4", "[D@5]:0+4", + "[B@0]:0+2+2+2", "[B@5]:0+2+2", + "[C@0]:0+3+3", "[C@5]:0+3", + + "[A@5]:0+1+1", "[A@10]:0+1", + "[B@5]:0+2+2+2", "[B@10]:0+2", + "[D@5]:0+4+4", "[D@10]:0+4", + "[B@5]:0+2+2+2+2", "[B@10]:0+2+2", + "[C@5]:0+3+3", "[C@10]:0+3"), proc2.processed); + + } finally { + Utils.delete(baseDir); + } + } + + @Test + public void testJoin() throws Exception { + final File baseDir = Files.createTempDirectory("test").toFile(); + + try { + final KStreamBuilder builder = new KStreamBuilder(); + String topic1 = "topic1"; + String topic2 = "topic2"; + + KStream<String, String> stream1 = builder.stream(strDeserializer, strDeserializer, topic1); + KTable<Windowed<String>, String> table1 = stream1.aggregateByKey(new StringInit(), new StringAdd(), + HoppingWindows.of("topic1-Canonized").with(10L).every(5L), + strSerializer, + strSerializer, + strDeserializer, + strDeserializer); + + MockProcessorSupplier<Windowed<String>, String> proc1 = new MockProcessorSupplier<>(); + table1.toStream().process(proc1); + + KStream<String, String> stream2 = builder.stream(strDeserializer, strDeserializer, topic2); + KTable<Windowed<String>, String> table2 = stream2.aggregateByKey(new StringInit(), new StringAdd(), + HoppingWindows.of("topic2-Canonized").with(10L).every(5L), + strSerializer, + strSerializer, + strDeserializer, + strDeserializer); + + MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>(); + table2.toStream().process(proc2); + + + MockProcessorSupplier<Windowed<String>, String> proc3 = new MockProcessorSupplier<>(); + table1.join(table2, new ValueJoiner<String, String, String>() { + @Override + public String apply(String p1, String p2) { + return p1 + "%" + p2; + } + }).toStream().process(proc3); + + KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir); + + driver.setTime(0L); + driver.process(topic1, "A", "1"); + driver.setTime(1L); + driver.process(topic1, "B", "2"); + driver.setTime(2L); + driver.process(topic1, "C", "3"); + driver.setTime(3L); + driver.process(topic1, "D", "4"); + driver.setTime(4L); + driver.process(topic1, "A", "1"); + + proc1.checkAndClearResult( + "[A@0]:0+1", + "[B@0]:0+2", + "[C@0]:0+3", + "[D@0]:0+4", + "[A@0]:0+1+1" + ); + proc2.checkAndClearResult(); + proc3.checkAndClearResult( + "[A@0]:null", + "[B@0]:null", + "[C@0]:null", + "[D@0]:null", + "[A@0]:null" + ); + + driver.setTime(5L); + driver.process(topic1, "A", "1"); + driver.setTime(6L); + driver.process(topic1, "B", "2"); + driver.setTime(7L); + driver.process(topic1, "D", "4"); + driver.setTime(8L); + driver.process(topic1, "B", "2"); + driver.setTime(9L); + driver.process(topic1, "C", "3"); + + proc1.checkAndClearResult( + "[A@0]:0+1+1+1", "[A@5]:0+1", + "[B@0]:0+2+2", "[B@5]:0+2", + "[D@0]:0+4+4", "[D@5]:0+4", + "[B@0]:0+2+2+2", "[B@5]:0+2+2", + "[C@0]:0+3+3", "[C@5]:0+3" + ); + proc2.checkAndClearResult(); + proc3.checkAndClearResult( + "[A@0]:null", "[A@5]:null", + "[B@0]:null", "[B@5]:null", + "[D@0]:null", "[D@5]:null", + "[B@0]:null", "[B@5]:null", + "[C@0]:null", "[C@5]:null" + ); + + driver.setTime(0L); + driver.process(topic2, "A", "a"); + driver.setTime(1L); + driver.process(topic2, "B", "b"); + driver.setTime(2L); + driver.process(topic2, "C", "c"); + driver.setTime(3L); + driver.process(topic2, "D", "d"); + driver.setTime(4L); + driver.process(topic2, "A", "a"); + + proc1.checkAndClearResult(); + proc2.checkAndClearResult( + "[A@0]:0+a", + "[B@0]:0+b", + "[C@0]:0+c", + "[D@0]:0+d", + "[A@0]:0+a+a" + ); + proc3.checkAndClearResult( + "[A@0]:0+1+1+1%0+a", + "[B@0]:0+2+2+2%0+b", + "[C@0]:0+3+3%0+c", + "[D@0]:0+4+4%0+d", + "[A@0]:0+1+1+1%0+a+a"); + + driver.setTime(5L); + driver.process(topic2, "A", "a"); + driver.setTime(6L); + driver.process(topic2, "B", "b"); + driver.setTime(7L); + driver.process(topic2, "D", "d"); + driver.setTime(8L); + driver.process(topic2, "B", "b"); + driver.setTime(9L); + driver.process(topic2, "C", "c"); + + proc1.checkAndClearResult(); + proc2.checkAndClearResult( + "[A@0]:0+a+a+a", "[A@5]:0+a", + "[B@0]:0+b+b", "[B@5]:0+b", + "[D@0]:0+d+d", "[D@5]:0+d", + "[B@0]:0+b+b+b", "[B@5]:0+b+b", + "[C@0]:0+c+c", "[C@5]:0+c" + ); + proc3.checkAndClearResult( + "[A@0]:0+1+1+1%0+a+a+a", "[A@5]:0+1%0+a", + "[B@0]:0+2+2+2%0+b+b", "[B@5]:0+2+2%0+b", + "[D@0]:0+4+4%0+d+d", "[D@5]:0+4%0+d", + "[B@0]:0+2+2+2%0+b+b+b", "[B@5]:0+2+2%0+b+b", + "[C@0]:0+3+3%0+c+c", "[C@5]:0+3%0+c" + ); + + } finally { + Utils.delete(baseDir); + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java index 59711db..ec85ed7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java @@ -65,6 +65,7 @@ public class KTableAggregateTest { } } + @Test public void testAggBasic() throws Exception { final File baseDir = Files.createTempDirectory("test").toFile(); http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java index 9d0c0e2..61f6dbf 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java @@ -77,7 +77,7 @@ public class PartitionGroupTest { assertEquals(6, group.numBuffered()); assertEquals(3, group.numBuffered(partition1)); assertEquals(3, group.numBuffered(partition2)); - assertEquals(TimestampTracker.NOT_KNOWN, group.timestamp()); + assertEquals(1L, group.timestamp()); StampedRecord record; PartitionGroup.RecordInfo info = new PartitionGroup.RecordInfo(); @@ -89,7 +89,7 @@ public class PartitionGroupTest { assertEquals(5, group.numBuffered()); assertEquals(2, group.numBuffered(partition1)); assertEquals(3, group.numBuffered(partition2)); - assertEquals(TimestampTracker.NOT_KNOWN, group.timestamp()); + assertEquals(2L, group.timestamp()); // get one record, now the time should be advanced record = group.nextRecord(info); http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java index 614e2c7..36f38e6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java @@ -58,7 +58,7 @@ public class RecordQueueTest { queue.addRawRecords(list1, timestampExtractor); assertEquals(3, queue.size()); - assertEquals(TimestampTracker.NOT_KNOWN, queue.timestamp()); + assertEquals(1L, queue.timestamp()); // poll the first record, now with 1, 3 assertEquals(2L, queue.poll().timestamp); @@ -107,7 +107,7 @@ public class RecordQueueTest { queue.addRawRecords(list3, timestampExtractor); assertEquals(3, queue.size()); - assertEquals(3L, queue.timestamp()); + assertEquals(4L, queue.timestamp()); // poll one record again, the timestamp should advance now assertEquals(4L, queue.poll().timestamp); http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 94f0ce3..0430566 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -124,8 +124,8 @@ public class StreamTaskTest { assertEquals(0, source2.numReceived); assertEquals(4, task.process()); - assertEquals(1, source1.numReceived); - assertEquals(1, source2.numReceived); + assertEquals(2, source1.numReceived); + assertEquals(0, source2.numReceived); assertEquals(3, task.process()); assertEquals(2, source1.numReceived); @@ -188,14 +188,21 @@ public class StreamTaskTest { assertTrue(consumer.paused().contains(partition2)); assertEquals(7, task.process()); - assertEquals(1, source1.numReceived); - assertEquals(1, source2.numReceived); + assertEquals(2, source1.numReceived); + assertEquals(0, source2.numReceived); assertEquals(1, consumer.paused().size()); - assertTrue(consumer.paused().contains(partition1)); + assertTrue(consumer.paused().contains(partition2)); assertEquals(6, task.process()); - assertEquals(2, source1.numReceived); + assertEquals(3, source1.numReceived); + assertEquals(0, source2.numReceived); + + assertEquals(1, consumer.paused().size()); + assertTrue(consumer.paused().contains(partition2)); + + assertEquals(5, task.process()); + assertEquals(3, source1.numReceived); assertEquals(1, source2.numReceived); assertEquals(0, consumer.paused().size());
