HOTFIX: set timestamp in SinkNode guozhangwang Setting the timestamp in produced records in SinkNode. This forces the producer record's timestamp same as the context's timestamp.
Author: Yasuhiro Matsuda <[email protected]> Reviewers: Guozhang Wang <[email protected]> Closes #1137 from ymatsuda/set_timestamp_in_sinknode Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/80ba01e1 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/80ba01e1 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/80ba01e1 Branch: refs/heads/0.10.0 Commit: 80ba01e16ba17c48058987ee3a1384f1e23df343 Parents: 625c516 Author: Yasuhiro Matsuda <[email protected]> Authored: Mon Apr 4 14:57:15 2016 -0700 Committer: Gwen Shapira <[email protected]> Committed: Tue Apr 5 17:08:53 2016 -0700 ---------------------------------------------------------------------- .../org/apache/kafka/streams/processor/internals/SinkNode.java | 2 +- streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/80ba01e1/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java index ffc72fd..31a558b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java @@ -57,7 +57,7 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> { public void process(K key, V value) { // send to all the registered topics RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector(); - collector.send(new ProducerRecord<>(topic, key, value), keySerializer, valSerializer, partitioner); + collector.send(new ProducerRecord<>(topic, null, context.timestamp(), key, value), keySerializer, valSerializer, partitioner); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/80ba01e1/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java index 05713c1..0c56c26 100644 --- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java @@ -57,6 +57,7 @@ public class KStreamTestDriver { this.topology = builder.build("X", null); this.stateDir = stateDir; this.context = new MockProcessorContext(this, stateDir, keySerde, valSerde, new MockRecordCollector()); + this.context.setTime(0L); for (StateStoreSupplier stateStoreSupplier : topology.stateStoreSuppliers()) { StateStore store = stateStoreSupplier.get();
