This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 32d9dec9e15 rebase to fix merge conflict (#17702)
32d9dec9e15 is described below

commit 32d9dec9e15c6e411f78ff5f4b60a14537f90d4c
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Fri Nov 8 19:04:07 2024 -0800

    rebase to fix merge conflict (#17702)
    
    Fixes an issue with the TTD in the specific case where users don't specify 
an initial time for the driver and also don't specify a start timestamp for the 
TestInputTopic, then pipe input records without timestamps. This combination 
results in a slight mismatch in the expected timestamps for the piped records, 
which can be noticeable when writing tests with very small time deltas.
    
    The problem is that, while both the TTD and the TestInputTopic will be 
initialized to the "current time" when not otherwise specified, it's possible 
for some milliseconds to have passed between the creation of the TTD and the 
creation of the TestInputTopic. This can result in a TestInputTopic getting a 
start timestamp that's several ms larger than the driver's time, and ultimately 
causing the piped input records to have timestamps slightly in the future 
relative to the driver.
    
    In practice even those who hit this issue might not notice it if they 
aren't manipulating time in their tests, or are advancing time by enough to 
negate the several-milliseconds of difference. However we noticed a test fail 
due to this because we were testing a ttl-based processor and had advanced the 
driver time by only 1 millisecond past the ttl. The piped record should have 
been expired, but because it's timestamp was a few milliseconds longer than the 
driver's start time, this tes [...]
    
    Reviewers: Matthias Sax <[email protected]>, Bruno Cadonna 
<[email protected]>, Lucas Brutschy < [email protected]>
---
 .../processor/internals/StreamsProducer.java       |  2 +-
 .../apache/kafka/streams/TopologyTestDriver.java   | 35 ++++++++--------------
 2 files changed, 14 insertions(+), 23 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
index b7735655763..1048b5a2ecf 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
@@ -239,7 +239,7 @@ public class StreamsProducer {
      * @throws IllegalStateException if EOS is disabled
      * @throws TaskMigratedException
      */
-    protected void commitTransaction(final Map<TopicPartition, 
OffsetAndMetadata> offsets,
+    public void commitTransaction(final Map<TopicPartition, OffsetAndMetadata> 
offsets,
                                      final ConsumerGroupMetadata 
consumerGroupMetadata) {
         if (!eosEnabled()) {
             throw new IllegalStateException(formatException("Exactly-once is 
not enabled"));
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 271824ceeb5..cbf314e4426 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -29,7 +29,6 @@ import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
@@ -44,7 +43,6 @@ import org.apache.kafka.streams.TopologyConfig.TaskConfig;
 import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.internals.StreamsConfigUtils;
-import org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.PunctuationType;
@@ -233,7 +231,7 @@ public class TopologyTestDriver implements Closeable {
 
     private final MockConsumer<byte[], byte[]> consumer;
     private final MockProducer<byte[], byte[]> producer;
-    private final TestDriverProducer testDriverProducer;
+    private final StreamsProducer testDriverProducer;
 
     private final Map<String, TopicPartition> partitionsByInputTopic = new 
HashMap<>();
     private final Map<String, TopicPartition> globalPartitionsByInputTopic = 
new HashMap<>();
@@ -345,7 +343,8 @@ public class TopologyTestDriver implements Closeable {
                 return Collections.singletonList(new PartitionInfo(topic, 
PARTITION_ID, null, null, null));
             }
         };
-        testDriverProducer = new TestDriverProducer(
+
+        testDriverProducer = new StreamsProducer(
             producer,
             StreamsConfigUtils.processingMode(streamsConfig),
             mockWallClockTime,
@@ -739,7 +738,14 @@ public class TopologyTestDriver implements Closeable {
     public final <K, V> TestInputTopic<K, V> createInputTopic(final String 
topicName,
                                                               final 
Serializer<K> keySerializer,
                                                               final 
Serializer<V> valueSerializer) {
-        return new TestInputTopic<>(this, topicName, keySerializer, 
valueSerializer, Instant.now(), Duration.ZERO);
+        return new TestInputTopic<>(
+            this,
+            topicName,
+            keySerializer,
+            valueSerializer,
+            Instant.ofEpochMilli(mockWallClockTime.milliseconds()),
+            Duration.ZERO
+        );
     }
 
     /**
@@ -986,7 +992,7 @@ public class TopologyTestDriver implements Closeable {
     public <K, V> KeyValueStore<K, V> getKeyValueStore(final String name) {
         final StateStore store = getStateStore(name, false);
         if (store instanceof TimestampedKeyValueStore) {
-            log.info("Method #getTimestampedKeyValueStore() should be used to 
access a TimestampedKeyValueStore.");
+            log.warn("Method #getTimestampedKeyValueStore() should be used to 
access a TimestampedKeyValueStore.");
             return new KeyValueStoreFacade<>((TimestampedKeyValueStore<K, V>) 
store);
         }
         return store instanceof KeyValueStore ? (KeyValueStore<K, V>) store : 
null;
@@ -1064,7 +1070,7 @@ public class TopologyTestDriver implements Closeable {
     public <K, V> WindowStore<K, V> getWindowStore(final String name) {
         final StateStore store = getStateStore(name, false);
         if (store instanceof TimestampedWindowStore) {
-            log.info("Method #getTimestampedWindowStore() should be used to 
access a TimestampedWindowStore.");
+            log.warn("Method #getTimestampedWindowStore() should be used to 
access a TimestampedWindowStore.");
             return new WindowStoreFacade<>((TimestampedWindowStore<K, V>) 
store);
         }
         return store instanceof WindowStore ? (WindowStore<K, V>) store : null;
@@ -1352,19 +1358,4 @@ public class TopologyTestDriver implements Closeable {
         }
     }
 
-    private static class TestDriverProducer extends StreamsProducer {
-
-        public TestDriverProducer(final Producer<byte[], byte[]> producer,
-                                  final ProcessingMode processingMode,
-                                  final Time time,
-                                  final LogContext logContext) {
-            super(producer, processingMode, time, logContext);
-        }
-
-        @Override
-        public void commitTransaction(final Map<TopicPartition, 
OffsetAndMetadata> offsets,
-                                      final ConsumerGroupMetadata 
consumerGroupMetadata) throws ProducerFencedException {
-            super.commitTransaction(offsets, consumerGroupMetadata);
-        }
-    }
 }

Reply via email to