[ https://issues.apache.org/jira/browse/KAFKA-5233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352966#comment-16352966 ]
ASF GitHub Bot commented on KAFKA-5233: --------------------------------------- guozhangwang closed pull request #3678: KAFKA-5233 follow up URL: https://github.com/apache/kafka/pull/3678 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java index d191891afe9..5aa7c7d94bb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.streams.kstream.Transformer; import org.apache.kafka.streams.kstream.TransformerSupplier; import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockProcessorSupplier; import org.junit.Rule; @@ -39,38 +40,73 @@ @Rule public final KStreamTestDriver driver = new KStreamTestDriver(); + private Punctuator punctuator; + + private final TransformerSupplier<Number, Number, KeyValue<Integer, Integer>> transformerSupplier = + new TransformerSupplier<Number, Number, KeyValue<Integer, Integer>>() { + public Transformer<Number, Number, KeyValue<Integer, Integer>> get() { + return new Transformer<Number, Number, KeyValue<Integer, Integer>>() { + + private int total = 0; + + @Override + public void init(final ProcessorContext context) { + punctuator = new Punctuator() { + @Override + public void punctuate(long timestamp) { + context.forward(-1, (int) timestamp); + } + }; + } + + @Override + public KeyValue<Integer, Integer> transform(Number key, Number value) { + total += value.intValue(); + return KeyValue.pair(key.intValue() * 2, total); + } + + @Override + public KeyValue<Integer, Integer> punctuate(long timestamp) { + return KeyValue.pair(-1, (int) timestamp); + } + + @Override + public void close() { + } + }; + } + }; + @Test public void testTransform() { StreamsBuilder builder = new StreamsBuilder(); - TransformerSupplier<Number, Number, KeyValue<Integer, Integer>> transformerSupplier = - new TransformerSupplier<Number, Number, KeyValue<Integer, Integer>>() { - public Transformer<Number, Number, KeyValue<Integer, Integer>> get() { - return new Transformer<Number, Number, KeyValue<Integer, Integer>>() { + final int[] expectedKeys = {1, 10, 100, 1000}; + + MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>(); + KStream<Integer, Integer> stream = builder.stream(intSerde, intSerde, topicName); + stream.transform(transformerSupplier).process(processor); + + driver.setUp(builder); + for (int expectedKey : expectedKeys) { + driver.process(topicName, expectedKey, expectedKey * 10); + } - private int total = 0; + driver.punctuate(2, punctuator); + driver.punctuate(3, punctuator); - @Override - public void init(ProcessorContext context) { - } + assertEquals(6, processor.processed.size()); - @Override - public KeyValue<Integer, Integer> transform(Number key, Number value) { - total += value.intValue(); - return KeyValue.pair(key.intValue() * 2, total); - } + String[] expected = {"2:10", "20:110", "200:1110", "2000:11110", "-1:2", "-1:3"}; - @Override - public KeyValue<Integer, Integer> punctuate(long timestamp) { - return KeyValue.pair(-1, (int) timestamp); - } + for (int i = 0; i < expected.length; i++) { + assertEquals(expected[i], processor.processed.get(i)); + } + } - @Override - public void close() { - } - }; - } - }; + @Test @Deprecated + public void testTransformWithDeprecatedPunctuate() { + StreamsBuilder builder = new StreamsBuilder(); final int[] expectedKeys = {1, 10, 100, 1000}; @@ -83,8 +119,8 @@ public void close() { driver.process(topicName, expectedKey, expectedKey * 10); } - driver.punctuate(2); - driver.punctuate(3); + driver.punctuateDeprecated(2); + driver.punctuateDeprecated(3); assertEquals(6, processor.processed.size()); diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java index 474ef5c96a7..e6c5824b3b2 100644 --- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java @@ -24,6 +24,7 @@ import org.apache.kafka.streams.StreamsBuilderTest; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; @@ -189,7 +190,23 @@ private ProcessorNode sourceNodeByTopicName(final String topicName) { return topicNode; } - public void punctuate(final long timestamp) { + public void punctuate(final long timestamp, Punctuator punctuator) { + final ProcessorNode prevNode = context.currentNode(); + for (final ProcessorNode processor : topology.processors()) { + if (processor.processor() != null) { + context.setRecordContext(createRecordContext(timestamp)); + context.setCurrentNode(processor); + try { + processor.punctuate(timestamp, punctuator); + } finally { + context.setCurrentNode(prevNode); + } + } + } + } + + @Deprecated + public void punctuateDeprecated(final long timestamp) { final ProcessorNode prevNode = context.currentNode(); for (final ProcessorNode processor : topology.processors()) { if (processor.processor() != null) { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Changes to punctuate semantics (KIP-138) > ---------------------------------------- > > Key: KAFKA-5233 > URL: https://issues.apache.org/jira/browse/KAFKA-5233 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Michal Borowiecki > Assignee: Michal Borowiecki > Priority: Major > Labels: kip > Fix For: 1.0.0 > > > This ticket is to track implementation of > [KIP-138: Change punctuate > semantics|https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics] -- This message was sent by Atlassian JIRA (v7.6.3#76005)