This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 32d9dec9e15 rebase to fix merge conflict (#17702)
32d9dec9e15 is described below
commit 32d9dec9e15c6e411f78ff5f4b60a14537f90d4c
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Fri Nov 8 19:04:07 2024 -0800
rebase to fix merge conflict (#17702)
Fixes an issue with the TTD in the specific case where users don't specify
an initial time for the driver and also don't specify a start timestamp for the
TestInputTopic, then pipe input records without timestamps. This combination
results in a slight mismatch in the expected timestamps for the piped records,
which can be noticeable when writing tests with very small time deltas.
The problem is that, while both the TTD and the TestInputTopic will be
initialized to the "current time" when not otherwise specified, it's possible
for some milliseconds to have passed between the creation of the TTD and the
creation of the TestInputTopic. This can result in a TestInputTopic getting a
start timestamp that's several ms larger than the driver's time, and ultimately
causing the piped input records to have timestamps slightly in the future
relative to the driver.
In practice even those who hit this issue might not notice it if they
aren't manipulating time in their tests, or are advancing time by enough to
negate the several-milliseconds of difference. However we noticed a test fail
due to this because we were testing a ttl-based processor and had advanced the
driver time by only 1 millisecond past the ttl. The piped record should have
been expired, but because it's timestamp was a few milliseconds longer than the
driver's start time, this tes [...]
Reviewers: Matthias Sax <[email protected]>, Bruno Cadonna
<[email protected]>, Lucas Brutschy < [email protected]>
---
.../processor/internals/StreamsProducer.java | 2 +-
.../apache/kafka/streams/TopologyTestDriver.java | 35 ++++++++--------------
2 files changed, 14 insertions(+), 23 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
index b7735655763..1048b5a2ecf 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
@@ -239,7 +239,7 @@ public class StreamsProducer {
* @throws IllegalStateException if EOS is disabled
* @throws TaskMigratedException
*/
- protected void commitTransaction(final Map<TopicPartition,
OffsetAndMetadata> offsets,
+ public void commitTransaction(final Map<TopicPartition, OffsetAndMetadata>
offsets,
final ConsumerGroupMetadata
consumerGroupMetadata) {
if (!eosEnabled()) {
throw new IllegalStateException(formatException("Exactly-once is
not enabled"));
diff --git
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 271824ceeb5..cbf314e4426 100644
---
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -29,7 +29,6 @@ import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
@@ -44,7 +43,6 @@ import org.apache.kafka.streams.TopologyConfig.TaskConfig;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.internals.StreamsConfigUtils;
-import org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
@@ -233,7 +231,7 @@ public class TopologyTestDriver implements Closeable {
private final MockConsumer<byte[], byte[]> consumer;
private final MockProducer<byte[], byte[]> producer;
- private final TestDriverProducer testDriverProducer;
+ private final StreamsProducer testDriverProducer;
private final Map<String, TopicPartition> partitionsByInputTopic = new
HashMap<>();
private final Map<String, TopicPartition> globalPartitionsByInputTopic =
new HashMap<>();
@@ -345,7 +343,8 @@ public class TopologyTestDriver implements Closeable {
return Collections.singletonList(new PartitionInfo(topic,
PARTITION_ID, null, null, null));
}
};
- testDriverProducer = new TestDriverProducer(
+
+ testDriverProducer = new StreamsProducer(
producer,
StreamsConfigUtils.processingMode(streamsConfig),
mockWallClockTime,
@@ -739,7 +738,14 @@ public class TopologyTestDriver implements Closeable {
public final <K, V> TestInputTopic<K, V> createInputTopic(final String
topicName,
final
Serializer<K> keySerializer,
final
Serializer<V> valueSerializer) {
- return new TestInputTopic<>(this, topicName, keySerializer,
valueSerializer, Instant.now(), Duration.ZERO);
+ return new TestInputTopic<>(
+ this,
+ topicName,
+ keySerializer,
+ valueSerializer,
+ Instant.ofEpochMilli(mockWallClockTime.milliseconds()),
+ Duration.ZERO
+ );
}
/**
@@ -986,7 +992,7 @@ public class TopologyTestDriver implements Closeable {
public <K, V> KeyValueStore<K, V> getKeyValueStore(final String name) {
final StateStore store = getStateStore(name, false);
if (store instanceof TimestampedKeyValueStore) {
- log.info("Method #getTimestampedKeyValueStore() should be used to
access a TimestampedKeyValueStore.");
+ log.warn("Method #getTimestampedKeyValueStore() should be used to
access a TimestampedKeyValueStore.");
return new KeyValueStoreFacade<>((TimestampedKeyValueStore<K, V>)
store);
}
return store instanceof KeyValueStore ? (KeyValueStore<K, V>) store :
null;
@@ -1064,7 +1070,7 @@ public class TopologyTestDriver implements Closeable {
public <K, V> WindowStore<K, V> getWindowStore(final String name) {
final StateStore store = getStateStore(name, false);
if (store instanceof TimestampedWindowStore) {
- log.info("Method #getTimestampedWindowStore() should be used to
access a TimestampedWindowStore.");
+ log.warn("Method #getTimestampedWindowStore() should be used to
access a TimestampedWindowStore.");
return new WindowStoreFacade<>((TimestampedWindowStore<K, V>)
store);
}
return store instanceof WindowStore ? (WindowStore<K, V>) store : null;
@@ -1352,19 +1358,4 @@ public class TopologyTestDriver implements Closeable {
}
}
- private static class TestDriverProducer extends StreamsProducer {
-
- public TestDriverProducer(final Producer<byte[], byte[]> producer,
- final ProcessingMode processingMode,
- final Time time,
- final LogContext logContext) {
- super(producer, processingMode, time, logContext);
- }
-
- @Override
- public void commitTransaction(final Map<TopicPartition,
OffsetAndMetadata> offsets,
- final ConsumerGroupMetadata
consumerGroupMetadata) throws ProducerFencedException {
- super.commitTransaction(offsets, consumerGroupMetadata);
- }
- }
}