[ 
https://issues.apache.org/jira/browse/KAFKA-5233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352966#comment-16352966
 ] 

ASF GitHub Bot commented on KAFKA-5233:
---------------------------------------

guozhangwang closed pull request #3678: KAFKA-5233 follow up
URL: https://github.com/apache/kafka/pull/3678
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
index d191891afe9..5aa7c7d94bb 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
@@ -24,6 +24,7 @@
 import org.apache.kafka.streams.kstream.Transformer;
 import org.apache.kafka.streams.kstream.TransformerSupplier;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.junit.Rule;
@@ -39,38 +40,73 @@
     @Rule
     public final KStreamTestDriver driver = new KStreamTestDriver();
 
+    private Punctuator punctuator;
+
+    private final TransformerSupplier<Number, Number, KeyValue<Integer, 
Integer>> transformerSupplier =
+        new TransformerSupplier<Number, Number, KeyValue<Integer, Integer>>() {
+            public Transformer<Number, Number, KeyValue<Integer, Integer>> 
get() {
+                return new Transformer<Number, Number, KeyValue<Integer, 
Integer>>() {
+
+                    private int total = 0;
+
+                    @Override
+                    public void init(final ProcessorContext context) {
+                        punctuator = new Punctuator() {
+                            @Override
+                            public void punctuate(long timestamp) {
+                                context.forward(-1, (int) timestamp);
+                            }
+                        };
+                    }
+
+                    @Override
+                    public KeyValue<Integer, Integer> transform(Number key, 
Number value) {
+                        total += value.intValue();
+                        return KeyValue.pair(key.intValue() * 2, total);
+                    }
+
+                    @Override
+                    public KeyValue<Integer, Integer> punctuate(long 
timestamp) {
+                        return KeyValue.pair(-1, (int) timestamp);
+                    }
+
+                    @Override
+                    public void close() {
+                    }
+                };
+            }
+        };
+
     @Test
     public void testTransform() {
         StreamsBuilder builder = new StreamsBuilder();
 
-        TransformerSupplier<Number, Number, KeyValue<Integer, Integer>> 
transformerSupplier =
-            new TransformerSupplier<Number, Number, KeyValue<Integer, 
Integer>>() {
-                public Transformer<Number, Number, KeyValue<Integer, Integer>> 
get() {
-                    return new Transformer<Number, Number, KeyValue<Integer, 
Integer>>() {
+        final int[] expectedKeys = {1, 10, 100, 1000};
+
+        MockProcessorSupplier<Integer, Integer> processor = new 
MockProcessorSupplier<>();
+        KStream<Integer, Integer> stream = builder.stream(intSerde, intSerde, 
topicName);
+        stream.transform(transformerSupplier).process(processor);
+
+        driver.setUp(builder);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topicName, expectedKey, expectedKey * 10);
+        }
 
-                        private int total = 0;
+        driver.punctuate(2, punctuator);
+        driver.punctuate(3, punctuator);
 
-                        @Override
-                        public void init(ProcessorContext context) {
-                        }
+        assertEquals(6, processor.processed.size());
 
-                        @Override
-                        public KeyValue<Integer, Integer> transform(Number 
key, Number value) {
-                            total += value.intValue();
-                            return KeyValue.pair(key.intValue() * 2, total);
-                        }
+        String[] expected = {"2:10", "20:110", "200:1110", "2000:11110", 
"-1:2", "-1:3"};
 
-                        @Override
-                        public KeyValue<Integer, Integer> punctuate(long 
timestamp) {
-                            return KeyValue.pair(-1, (int) timestamp);
-                        }
+        for (int i = 0; i < expected.length; i++) {
+            assertEquals(expected[i], processor.processed.get(i));
+        }
+    }
 
-                        @Override
-                        public void close() {
-                        }
-                    };
-                }
-            };
+    @Test @Deprecated
+    public void testTransformWithDeprecatedPunctuate() {
+        StreamsBuilder builder = new StreamsBuilder();
 
         final int[] expectedKeys = {1, 10, 100, 1000};
 
@@ -83,8 +119,8 @@ public void close() {
             driver.process(topicName, expectedKey, expectedKey * 10);
         }
 
-        driver.punctuate(2);
-        driver.punctuate(3);
+        driver.punctuateDeprecated(2);
+        driver.punctuateDeprecated(3);
 
         assertEquals(6, processor.processed.size());
 
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java 
b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 474ef5c96a7..e6c5824b3b2 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -24,6 +24,7 @@
 import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
@@ -189,7 +190,23 @@ private ProcessorNode sourceNodeByTopicName(final String 
topicName) {
         return topicNode;
     }
 
-    public void punctuate(final long timestamp) {
+    public void punctuate(final long timestamp, Punctuator punctuator) {
+        final ProcessorNode prevNode = context.currentNode();
+        for (final ProcessorNode processor : topology.processors()) {
+            if (processor.processor() != null) {
+                context.setRecordContext(createRecordContext(timestamp));
+                context.setCurrentNode(processor);
+                try {
+                    processor.punctuate(timestamp, punctuator);
+                } finally {
+                    context.setCurrentNode(prevNode);
+                }
+            }
+        }
+    }
+
+    @Deprecated
+    public void punctuateDeprecated(final long timestamp) {
         final ProcessorNode prevNode = context.currentNode();
         for (final ProcessorNode processor : topology.processors()) {
             if (processor.processor() != null) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Changes to punctuate semantics (KIP-138)
> ----------------------------------------
>
>                 Key: KAFKA-5233
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5233
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Michal Borowiecki
>            Assignee: Michal Borowiecki
>            Priority: Major
>              Labels: kip
>             Fix For: 1.0.0
>
>
> This ticket is to track implementation of 
> [KIP-138: Change punctuate 
> semantics|https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to