[ 
https://issues.apache.org/jira/browse/KAFKA-3625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16359094#comment-16359094
 ] 

ASF GitHub Bot commented on KAFKA-3625:
---------------------------------------

mjsax closed pull request #4502: KAFKA-3625: TopologyTestDriver must process 
output for wall-clock-time punctuations and on close()
URL: https://github.com/apache/kafka/pull/4502
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index ff6355448e0..a108f2256cb 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -328,42 +328,14 @@ public void pipeInput(final ConsumerRecord<byte[], 
byte[]> consumerRecord) {
                 consumerRecord.serializedValueSize(),
                 consumerRecord.key(),
                 consumerRecord.value())));
-            producer.clear();
 
             // Process the record ...
             ((InternalProcessorContext) task.context()).setRecordContext(new 
ProcessorRecordContext(consumerRecord.timestamp(), offset, 
topicPartition.partition(), topicName));
             task.process();
             task.maybePunctuateStreamTime();
             task.commit();
+            captureOutputRecords();
 
-            // Capture all the records sent to the producer ...
-            for (final ProducerRecord<byte[], byte[]> record : 
producer.history()) {
-                Queue<ProducerRecord<byte[], byte[]>> outputRecords = 
outputRecordsByTopic.get(record.topic());
-                if (outputRecords == null) {
-                    outputRecords = new LinkedList<>();
-                    outputRecordsByTopic.put(record.topic(), outputRecords);
-                }
-                outputRecords.add(record);
-
-                // Forward back into the topology if the produced record is to 
an internal or a source topic ...
-                final String outputTopicName = record.topic();
-                if (internalTopics.contains(outputTopicName) || 
processorTopology.sourceTopics().contains(outputTopicName)) {
-                    final byte[] serializedKey = record.key();
-                    final byte[] serializedValue = record.value();
-
-                    pipeInput(new ConsumerRecord<>(
-                        outputTopicName,
-                        -1,
-                        -1L,
-                        record.timestamp(),
-                        TimestampType.CREATE_TIME,
-                        0L,
-                        serializedKey == null ? 0 : serializedKey.length,
-                        serializedValue == null ? 0 : serializedValue.length,
-                        serializedKey,
-                        serializedValue));
-                }
-            }
         } else {
             final TopicPartition globalTopicPartition = 
globalPartitionsByTopic.get(topicName);
             if (globalTopicPartition == null) {
@@ -385,6 +357,38 @@ public void pipeInput(final ConsumerRecord<byte[], byte[]> 
consumerRecord) {
         }
     }
 
+    private void captureOutputRecords() {
+        // Capture all the records sent to the producer ...
+        final List<ProducerRecord<byte[], byte[]>> output = producer.history();
+        producer.clear();
+        for (final ProducerRecord<byte[], byte[]> record : output) {
+            Queue<ProducerRecord<byte[], byte[]>> outputRecords = 
outputRecordsByTopic.get(record.topic());
+            if (outputRecords == null) {
+                outputRecords = new LinkedList<>();
+                outputRecordsByTopic.put(record.topic(), outputRecords);
+            }
+            outputRecords.add(record);
+
+            // Forward back into the topology if the produced record is to an 
internal or a source topic ...
+            final String outputTopicName = record.topic();
+            if (internalTopics.contains(outputTopicName) || 
processorTopology.sourceTopics().contains(outputTopicName)) {
+                final byte[] serializedKey = record.key();
+                final byte[] serializedValue = record.value();
+
+                pipeInput(new ConsumerRecord<>(
+                    outputTopicName,
+                    -1,
+                    -1L,
+                    record.timestamp(),
+                    TimestampType.CREATE_TIME,
+                    0L,
+                    serializedKey == null ? 0 : serializedKey.length,
+                    serializedValue == null ? 0 : serializedValue.length,
+                    serializedKey,
+                    serializedValue));
+            }
+        }
+    }
     /**
      * Send input messages to the topology and then commit each message 
individually.
      *
@@ -407,6 +411,7 @@ public void advanceWallClockTime(final long advanceMs) {
         mockTime.sleep(advanceMs);
         task.maybePunctuateSystemTime();
         task.commit();
+        captureOutputRecords();
     }
 
     /**
@@ -558,6 +563,7 @@ public void close() {
                 // ignore
             }
         }
+        captureOutputRecords();
     }
 
     static class MockTime implements Time {
diff --git 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 921f6d6bed7..17d5e02f4be 100644
--- 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -19,8 +19,12 @@
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.streams.processor.Processor;
@@ -28,12 +32,15 @@
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.streams.test.OutputVerifier;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -81,6 +88,14 @@
             put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getAbsolutePath());
         }
     };
+    private KeyValueStore<String, Long> store;
+
+    private StringDeserializer stringDeserializer = new StringDeserializer();
+    private LongDeserializer longDeserializer = new LongDeserializer();
+    private ConsumerRecordFactory<String, Long> recordFactory = new 
ConsumerRecordFactory<>(
+        new StringSerializer(),
+        new LongSerializer());
+
 
     private final static class Record {
         private Object key;
@@ -223,7 +238,9 @@ public Processor get() {
 
     @After
     public void tearDown() {
-        testDriver.close();
+        if (testDriver != null) {
+            testDriver.close();
+        }
     }
 
     private Topology setupSourceSinkTopology() {
@@ -417,7 +434,7 @@ public void shouldUseSourceSpecificDeserializers() {
             SINK_TOPIC_1,
             new Serializer() {
                 @Override
-                public byte[] serialize(String topic, Object data) {
+                public byte[] serialize(final String topic, final Object data) 
{
                     if (data instanceof Long) {
                         return Serdes.Long().serializer().serialize(topic, 
(Long) data);
                     }
@@ -426,11 +443,11 @@ public void shouldUseSourceSpecificDeserializers() {
                 @Override
                 public void close() {}
                 @Override
-                public void configure(Map configs, boolean isKey) {}
+                public void configure(final Map configs, final boolean isKey) 
{}
             },
             new Serializer() {
                 @Override
-                public byte[] serialize(String topic, Object data) {
+                public byte[] serialize(final String topic, final Object data) 
{
                     if (data instanceof String) {
                         return Serdes.String().serializer().serialize(topic, 
(String) data);
                     }
@@ -439,7 +456,7 @@ public void configure(Map configs, boolean isKey) {}
                 @Override
                 public void close() {}
                 @Override
-                public void configure(Map configs, boolean isKey) {}
+                public void configure(final Map configs, final boolean isKey) 
{}
             },
             processor);
 
@@ -476,7 +493,7 @@ public void configure(Map configs, boolean isKey) {}
     }
 
     @Test
-    public void shouldUseSinkeSpecificSerializers() {
+    public void shouldUseSinkSpecificSerializers() {
         final Topology topology = new Topology();
 
         final String sourceName1 = "source-1";
@@ -691,4 +708,134 @@ public Processor get() {
         expectedStoreNames.add("globalStore");
         assertThat(testDriver.getAllStateStores().keySet(), 
equalTo(expectedStoreNames));
     }
+
+    private void setup() {
+        Topology topology = new Topology();
+        topology.addSource("sourceProcessor", "input-topic");
+        topology.addProcessor("aggregator", new CustomMaxAggregatorSupplier(), 
"sourceProcessor");
+        topology.addStateStore(Stores.keyValueStoreBuilder(
+            Stores.inMemoryKeyValueStore("aggStore"),
+            Serdes.String(),
+            Serdes.Long()).withLoggingDisabled(), // need to disable logging 
to allow store pre-populating
+            "aggregator");
+        topology.addSink("sinkProcessor", "result-topic", "aggregator");
+
+        config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass().getName());
+        config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.Long().getClass().getName());
+        testDriver = new TopologyTestDriver(topology, config);
+
+        store = testDriver.getKeyValueStore("aggStore");
+        store.put("a", 21L);
+    }
+
+    @Test
+    public void shouldFlushStoreForFirstInput() {
+        setup();
+        testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 
9999L));
+        OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", 
stringDeserializer, longDeserializer), "a", 21L);
+        Assert.assertNull(testDriver.readOutput("result-topic", 
stringDeserializer, longDeserializer));
+    }
+
+    @Test
+    public void shouldNotUpdateStoreForSmallerValue() {
+        setup();
+        testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 
9999L));
+        Assert.assertThat(store.get("a"), equalTo(21L));
+        OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", 
stringDeserializer, longDeserializer), "a", 21L);
+        Assert.assertNull(testDriver.readOutput("result-topic", 
stringDeserializer, longDeserializer));
+    }
+
+    @Test
+    public void shouldNotUpdateStoreForLargerValue() {
+        setup();
+        testDriver.pipeInput(recordFactory.create("input-topic", "a", 42L, 
9999L));
+        Assert.assertThat(store.get("a"), equalTo(42L));
+        OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", 
stringDeserializer, longDeserializer), "a", 42L);
+        Assert.assertNull(testDriver.readOutput("result-topic", 
stringDeserializer, longDeserializer));
+    }
+
+    @Test
+    public void shouldUpdateStoreForNewKey() {
+        setup();
+        testDriver.pipeInput(recordFactory.create("input-topic", "b", 21L, 
9999L));
+        Assert.assertThat(store.get("b"), equalTo(21L));
+        OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", 
stringDeserializer, longDeserializer), "a", 21L);
+        OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", 
stringDeserializer, longDeserializer), "b", 21L);
+        Assert.assertNull(testDriver.readOutput("result-topic", 
stringDeserializer, longDeserializer));
+    }
+
+    @Test
+    public void shouldPunctuateIfEvenTimeAdvances() {
+        setup();
+        testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 
9999L));
+        OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", 
stringDeserializer, longDeserializer), "a", 21L);
+
+        testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 
9999L));
+        Assert.assertNull(testDriver.readOutput("result-topic", 
stringDeserializer, longDeserializer));
+
+        testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 
10000L));
+        OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", 
stringDeserializer, longDeserializer), "a", 21L);
+        Assert.assertNull(testDriver.readOutput("result-topic", 
stringDeserializer, longDeserializer));
+    }
+
+    @Test
+    public void shouldPunctuateIfWallClockTimeAdvances() {
+        setup();
+        testDriver.advanceWallClockTime(60000);
+        OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", 
stringDeserializer, longDeserializer), "a", 21L);
+        Assert.assertNull(testDriver.readOutput("result-topic", 
stringDeserializer, longDeserializer));
+    }
+
+    private class CustomMaxAggregatorSupplier implements 
ProcessorSupplier<String, Long> {
+        @Override
+        public Processor<String, Long> get() {
+            return new CustomMaxAggregator();
+        }
+    }
+
+    private class CustomMaxAggregator implements Processor<String, Long> {
+        ProcessorContext context;
+        private KeyValueStore<String, Long> store;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(final ProcessorContext context) {
+            this.context = context;
+            context.schedule(60000, PunctuationType.WALL_CLOCK_TIME, new 
Punctuator() {
+                @Override
+                public void punctuate(final long timestamp) {
+                    flushStore();
+                }
+            });
+            context.schedule(10000, PunctuationType.STREAM_TIME, new 
Punctuator() {
+                @Override
+                public void punctuate(final long timestamp) {
+                    flushStore();
+                }
+            });
+            store = (KeyValueStore<String, Long>) 
context.getStateStore("aggStore");
+        }
+
+        @Override
+        public void process(final String key, final Long value) {
+            final Long oldValue = store.get(key);
+            if (oldValue == null || value > oldValue) {
+                store.put(key, value);
+            }
+        }
+
+        private void flushStore() {
+            final KeyValueIterator<String, Long> it = store.all();
+            while (it.hasNext()) {
+                final KeyValue<String, Long> next = it.next();
+                context.forward(next.key, next.value);
+            }
+        }
+
+        @Override
+        public void punctuate(final long timestamp) {}
+
+        @Override
+        public void close() {}
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Move kafka-streams test fixtures into a published package
> ---------------------------------------------------------
>
>                 Key: KAFKA-3625
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3625
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Jeff Klukas
>            Assignee: Matthias J. Sax
>            Priority: Minor
>              Labels: kip, user-experience
>             Fix For: 1.1.0
>
>
> The KStreamTestDriver and related fixtures defined in 
> streams/src/test/java/org/apache/kafka/test would be useful to developers 
> building applications on top of Kafka Streams, but they are not currently 
> exposed in a package.
> I propose moving this directory to live under streams/fixtures/src/main and 
> creating a new 'streams:fixtures' project in the gradle configuration to 
> publish these as a separate package.
> KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to