[hotfix] [Kafka Consumer] Clean up some code confusion and style in the Fetchers for Kafka 0.9/0.10
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fa1864c7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fa1864c7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fa1864c7 Branch: refs/heads/master Commit: fa1864c7a6eadea55eb2d7e8fd2b72e043841671 Parents: 611412c Author: Stephan Ewen <[email protected]> Authored: Wed Nov 9 17:58:54 2016 +0100 Committer: Stephan Ewen <[email protected]> Committed: Wed Nov 16 19:08:07 2016 +0100 ---------------------------------------------------------------------- .../flink-connector-kafka-0.10/pom.xml | 6 ++ .../kafka/internal/Kafka010Fetcher.java | 39 +++++-------- .../connectors/kafka/Kafka010FetcherTest.java | 1 - .../kafka/internals/SimpleConsumerThread.java | 2 +- .../kafka/internal/Kafka09Fetcher.java | 25 +++++--- .../kafka/internals/AbstractFetcher.java | 60 ++++++++++++++------ .../AbstractFetcherTimestampsTest.java | 53 +++++++++-------- 7 files changed, 107 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fa1864c7/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml b/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml index 8108afc..04019f8 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml +++ b/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml @@ -48,6 +48,12 @@ under the License. <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.9_2.10</artifactId> <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_${scala.binary.version}</artifactId> + </exclusion> + </exclusions> </dependency> <!-- Add Kafka 0.10.x as a dependency --> http://git-wip-us.apache.org/repos/asf/flink/blob/fa1864c7/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java index 4a1f5f6..024cd38 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java @@ -38,6 +38,9 @@ import java.util.Properties; /** * A fetcher that fetches data from Kafka brokers via the Kafka 0.10 consumer API. * + * <p>This fetcher re-uses basically all functionality of the 0.9 fetcher. It only additionally + * takes the KafkaRecord-attached timestamp and attaches it to the Flink records. + * * @param <T> The type of elements produced by the fetcher. */ public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> { @@ -76,37 +79,23 @@ public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> { } @Override - protected void assignPartitionsToConsumer(KafkaConsumer<byte[], byte[]> consumer, List<TopicPartition> topicPartitions) { - consumer.assign(topicPartitions); - } + protected void emitRecord( + T record, + KafkaTopicPartitionState<TopicPartition> partition, + long offset, + ConsumerRecord<?, ?> consumerRecord) throws Exception { - @Override - protected void emitRecord(T record, KafkaTopicPartitionState<TopicPartition> partition, long offset, ConsumerRecord consumerRecord) throws Exception { - // get timestamp from provided ConsumerRecord (only possible with kafka 0.10.x) - super.emitRecord(record, partition, offset, consumerRecord.timestamp()); + // we attach the Kafka 0.10 timestamp here + emitRecordWithTimestamp(record, partition, offset, consumerRecord.timestamp()); } /** - * Emit record Kafka-timestamp aware. + * This method needs to be overridden because Kafka broke binary compatibility between 0.9 and 0.10, + * changing the List in the signature to a Collection. */ @Override - protected void emitRecord(T record, KafkaTopicPartitionState<TopicPartition> partitionState, long offset, long timestamp) throws Exception { - if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) { - // fast path logic, in case there are no watermarks - - // emit the record, using the checkpoint lock to guarantee - // atomicity of record emission and offset state update - synchronized (checkpointLock) { - sourceContext.collectWithTimestamp(record, timestamp); - partitionState.setOffset(offset); - } - } - else if (timestampWatermarkMode == PERIODIC_WATERMARKS) { - emitRecordWithTimestampAndPeriodicWatermark(record, partitionState, offset, timestamp); - } - else { - emitRecordWithTimestampAndPunctuatedWatermark(record, partitionState, offset, timestamp); - } + protected void assignPartitionsToConsumer(KafkaConsumer<?, ?> consumer, List<TopicPartition> topicPartitions) { + consumer.assign(topicPartitions); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/fa1864c7/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java index 718db48..037d25b 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java @@ -114,7 +114,6 @@ public class Kafka010FetcherTest { SourceContext<String> sourceContext = mock(SourceContext.class); List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42)); KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); - StreamingRuntimeContext context = mock(StreamingRuntimeContext.class); final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>( sourceContext, http://git-wip-us.apache.org/repos/asf/flink/blob/fa1864c7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java index 1302348..35e491a 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java @@ -376,7 +376,7 @@ class SimpleConsumerThread<T> extends Thread { continue partitionsLoop; } - owner.emitRecord(value, currentPartition, offset, Long.MIN_VALUE); + owner.emitRecord(value, currentPartition, offset); } else { // no longer running http://git-wip-us.apache.org/repos/asf/flink/blob/fa1864c7/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java index a8c0397..acdcb61 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java @@ -201,7 +201,6 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem try { assignPartitionsToConsumer(consumer, convertKafkaPartitions(subscribedPartitions())); - if (useMetrics) { final MetricGroup kafkaMetricGroup = metricGroup.addGroup("KafkaConsumer"); addOffsetStateGauge(kafkaMetricGroup); @@ -306,14 +305,22 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem } } - // Kafka09Fetcher ignores the timestamp, Kafka010Fetcher is extracting the timestamp and passing it to the emitRecord() method. - protected void emitRecord(T record, KafkaTopicPartitionState<TopicPartition> partition, long offset, ConsumerRecord consumerRecord) throws Exception { - emitRecord(record, partition, offset, Long.MIN_VALUE); + // ------------------------------------------------------------------------ + // The below methods are overridden in the 0.10 fetcher, which otherwise + // reuses most of the 0.9 fetcher behavior + // ------------------------------------------------------------------------ + + protected void emitRecord( + T record, + KafkaTopicPartitionState<TopicPartition> partition, + long offset, + @SuppressWarnings("UnusedParameters") ConsumerRecord<?, ?> consumerRecord) throws Exception { + + // the 0.9 Fetcher does not try to extract a timestamp + emitRecord(record, partition, offset); } - /** - * Protected method to make the partition assignment pluggable, for different Kafka versions. - */ - protected void assignPartitionsToConsumer(KafkaConsumer<byte[], byte[]> consumer, List<TopicPartition> topicPartitions) { + + protected void assignPartitionsToConsumer(KafkaConsumer<?, ?> consumer, List<TopicPartition> topicPartitions) { consumer.assign(topicPartitions); } @@ -322,7 +329,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem } // ------------------------------------------------------------------------ - // Kafka 0.9 specific fetcher behavior + // Implement Methods of the AbstractFetcher // ------------------------------------------------------------------------ @Override http://git-wip-us.apache.org/repos/asf/flink/blob/fa1864c7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java index 3350b06..cf39606 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java @@ -205,32 +205,60 @@ public abstract class AbstractFetcher<T, KPH> { } } } - + // ------------------------------------------------------------------------ // emitting records // ------------------------------------------------------------------------ /** + * Emits a record without attaching an existing timestamp to it. + * * <p>Implementation Note: This method is kept brief to be JIT inlining friendly. * That makes the fast path efficient, the extended paths are called as separate methods. + * * @param record The record to emit * @param partitionState The state of the Kafka partition from which the record was fetched * @param offset The offset of the record - * @param timestamp The record's event-timestamp */ - protected void emitRecord(T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long timestamp) throws Exception { + protected void emitRecord(T record, KafkaTopicPartitionState<KPH> partitionState, long offset) throws Exception { if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) { // fast path logic, in case there are no watermarks // emit the record, using the checkpoint lock to guarantee // atomicity of record emission and offset state update synchronized (checkpointLock) { - if(timestamp != Long.MIN_VALUE) { - // this case is true for Kafka 0.10 - sourceContext.collectWithTimestamp(record, timestamp); - } else { - sourceContext.collect(record); - } + sourceContext.collect(record); + partitionState.setOffset(offset); + } + } + else if (timestampWatermarkMode == PERIODIC_WATERMARKS) { + emitRecordWithTimestampAndPeriodicWatermark(record, partitionState, offset, Long.MIN_VALUE); + } + else { + emitRecordWithTimestampAndPunctuatedWatermark(record, partitionState, offset, Long.MIN_VALUE); + } + } + + /** + * Emits a record attaching a timestamp to it. + * + * <p>Implementation Note: This method is kept brief to be JIT inlining friendly. + * That makes the fast path efficient, the extended paths are called as separate methods. + * + * @param record The record to emit + * @param partitionState The state of the Kafka partition from which the record was fetched + * @param offset The offset of the record + */ + protected void emitRecordWithTimestamp( + T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long timestamp) throws Exception { + + if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) { + // fast path logic, in case there are no watermarks generated in the fetcher + + // emit the record, using the checkpoint lock to guarantee + // atomicity of record emission and offset state update + synchronized (checkpointLock) { + sourceContext.collectWithTimestamp(record, timestamp); partitionState.setOffset(offset); } } @@ -285,14 +313,14 @@ public abstract class AbstractFetcher<T, KPH> { // from the punctuated extractor final long timestamp = withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp); final Watermark newWatermark = withWatermarksState.checkAndGetNewWatermark(record, timestamp); - + // emit the record with timestamp, using the usual checkpoint lock to guarantee // atomicity of record emission and offset state update synchronized (checkpointLock) { sourceContext.collectWithTimestamp(record, timestamp); partitionState.setOffset(offset); } - + // if we also have a new per-partition watermark, check if that is also a // new cross-partition watermark if (newWatermark != null) { @@ -306,7 +334,7 @@ public abstract class AbstractFetcher<T, KPH> { private void updateMinPunctuatedWatermark(Watermark nextWatermark) { if (nextWatermark.getTimestamp() > maxWatermarkSoFar) { long newMin = Long.MAX_VALUE; - + for (KafkaTopicPartitionState<?> state : allPartitions) { @SuppressWarnings("unchecked") final KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState = @@ -314,7 +342,7 @@ public abstract class AbstractFetcher<T, KPH> { newMin = Math.min(newMin, withWatermarksState.getCurrentPartitionWatermark()); } - + // double-check locking pattern if (newMin > maxWatermarkSoFar) { synchronized (checkpointLock) { @@ -416,7 +444,7 @@ public abstract class AbstractFetcher<T, KPH> { // add current offsets to gage MetricGroup currentOffsets = metricGroup.addGroup("current-offsets"); MetricGroup committedOffsets = metricGroup.addGroup("committed-offsets"); - for(KafkaTopicPartitionState ktp: subscribedPartitions()){ + for (KafkaTopicPartitionState<?> ktp: subscribedPartitions()) { currentOffsets.gauge(ktp.getTopic() + "-" + ktp.getPartition(), new OffsetGauge(ktp, OffsetGaugeType.CURRENT_OFFSET)); committedOffsets.gauge(ktp.getTopic() + "-" + ktp.getPartition(), new OffsetGauge(ktp, OffsetGaugeType.COMMITTED_OFFSET)); } @@ -435,10 +463,10 @@ public abstract class AbstractFetcher<T, KPH> { */ private static class OffsetGauge implements Gauge<Long> { - private final KafkaTopicPartitionState ktp; + private final KafkaTopicPartitionState<?> ktp; private final OffsetGaugeType gaugeType; - public OffsetGauge(KafkaTopicPartitionState ktp, OffsetGaugeType gaugeType) { + public OffsetGauge(KafkaTopicPartitionState<?> ktp, OffsetGaugeType gaugeType) { this.ktp = ktp; this.gaugeType = gaugeType; } http://git-wip-us.apache.org/repos/asf/flink/blob/fa1864c7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java index 5801c24..0b3507a 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java @@ -33,7 +33,6 @@ import javax.annotation.Nullable; import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.*; @@ -67,22 +66,22 @@ public class AbstractFetcherTimestampsTest { // elements generate a watermark if the timestamp is a multiple of three // elements for partition 1 - fetcher.emitRecord(1L, part1, 1L, Long.MIN_VALUE); - fetcher.emitRecord(2L, part1, 2L, Long.MIN_VALUE); - fetcher.emitRecord(3L, part1, 3L, Long.MIN_VALUE); + fetcher.emitRecord(1L, part1, 1L); + fetcher.emitRecord(2L, part1, 2L); + fetcher.emitRecord(3L, part1, 3L); assertEquals(3L, sourceContext.getLatestElement().getValue().longValue()); assertEquals(3L, sourceContext.getLatestElement().getTimestamp()); assertFalse(sourceContext.hasWatermark()); // elements for partition 2 - fetcher.emitRecord(12L, part2, 1L, Long.MIN_VALUE); + fetcher.emitRecord(12L, part2, 1L); assertEquals(12L, sourceContext.getLatestElement().getValue().longValue()); assertEquals(12L, sourceContext.getLatestElement().getTimestamp()); assertFalse(sourceContext.hasWatermark()); // elements for partition 3 - fetcher.emitRecord(101L, part3, 1L, Long.MIN_VALUE); - fetcher.emitRecord(102L, part3, 2L, Long.MIN_VALUE); + fetcher.emitRecord(101L, part3, 1L); + fetcher.emitRecord(102L, part3, 2L); assertEquals(102L, sourceContext.getLatestElement().getValue().longValue()); assertEquals(102L, sourceContext.getLatestElement().getTimestamp()); @@ -91,25 +90,25 @@ public class AbstractFetcherTimestampsTest { assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp()); // advance partition 3 - fetcher.emitRecord(1003L, part3, 3L, Long.MIN_VALUE); - fetcher.emitRecord(1004L, part3, 4L, Long.MIN_VALUE); - fetcher.emitRecord(1005L, part3, 5L, Long.MIN_VALUE); + fetcher.emitRecord(1003L, part3, 3L); + fetcher.emitRecord(1004L, part3, 4L); + fetcher.emitRecord(1005L, part3, 5L); assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue()); assertEquals(1005L, sourceContext.getLatestElement().getTimestamp()); // advance partition 1 beyond partition 2 - this bumps the watermark - fetcher.emitRecord(30L, part1, 4L, Long.MIN_VALUE); + fetcher.emitRecord(30L, part1, 4L); assertEquals(30L, sourceContext.getLatestElement().getValue().longValue()); assertEquals(30L, sourceContext.getLatestElement().getTimestamp()); assertTrue(sourceContext.hasWatermark()); assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp()); // advance partition 2 again - this bumps the watermark - fetcher.emitRecord(13L, part2, 2L, Long.MIN_VALUE); + fetcher.emitRecord(13L, part2, 2L); assertFalse(sourceContext.hasWatermark()); - fetcher.emitRecord(14L, part2, 3L, Long.MIN_VALUE); + fetcher.emitRecord(14L, part2, 3L); assertFalse(sourceContext.hasWatermark()); - fetcher.emitRecord(15L, part2, 3L, Long.MIN_VALUE); + fetcher.emitRecord(15L, part2, 3L); assertTrue(sourceContext.hasWatermark()); assertEquals(15L, sourceContext.getLatestWatermark().getTimestamp()); } @@ -141,20 +140,20 @@ public class AbstractFetcherTimestampsTest { // elements generate a watermark if the timestamp is a multiple of three // elements for partition 1 - fetcher.emitRecord(1L, part1, 1L, Long.MIN_VALUE); - fetcher.emitRecord(2L, part1, 2L, Long.MIN_VALUE); - fetcher.emitRecord(3L, part1, 3L, Long.MIN_VALUE); + fetcher.emitRecord(1L, part1, 1L); + fetcher.emitRecord(2L, part1, 2L); + fetcher.emitRecord(3L, part1, 3L); assertEquals(3L, sourceContext.getLatestElement().getValue().longValue()); assertEquals(3L, sourceContext.getLatestElement().getTimestamp()); // elements for partition 2 - fetcher.emitRecord(12L, part2, 1L, Long.MIN_VALUE); + fetcher.emitRecord(12L, part2, 1L); assertEquals(12L, sourceContext.getLatestElement().getValue().longValue()); assertEquals(12L, sourceContext.getLatestElement().getTimestamp()); // elements for partition 3 - fetcher.emitRecord(101L, part3, 1L, Long.MIN_VALUE); - fetcher.emitRecord(102L, part3, 2L, Long.MIN_VALUE); + fetcher.emitRecord(101L, part3, 1L); + fetcher.emitRecord(102L, part3, 2L); assertEquals(102L, sourceContext.getLatestElement().getValue().longValue()); assertEquals(102L, sourceContext.getLatestElement().getTimestamp()); @@ -164,14 +163,14 @@ public class AbstractFetcherTimestampsTest { assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp()); // advance partition 3 - fetcher.emitRecord(1003L, part3, 3L, Long.MIN_VALUE); - fetcher.emitRecord(1004L, part3, 4L, Long.MIN_VALUE); - fetcher.emitRecord(1005L, part3, 5L, Long.MIN_VALUE); + fetcher.emitRecord(1003L, part3, 3L); + fetcher.emitRecord(1004L, part3, 4L); + fetcher.emitRecord(1005L, part3, 5L); assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue()); assertEquals(1005L, sourceContext.getLatestElement().getTimestamp()); // advance partition 1 beyond partition 2 - this bumps the watermark - fetcher.emitRecord(30L, part1, 4L, Long.MIN_VALUE); + fetcher.emitRecord(30L, part1, 4L); assertEquals(30L, sourceContext.getLatestElement().getValue().longValue()); assertEquals(30L, sourceContext.getLatestElement().getTimestamp()); @@ -181,9 +180,9 @@ public class AbstractFetcherTimestampsTest { assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp()); // advance partition 2 again - this bumps the watermark - fetcher.emitRecord(13L, part2, 2L, Long.MIN_VALUE); - fetcher.emitRecord(14L, part2, 3L, Long.MIN_VALUE); - fetcher.emitRecord(15L, part2, 3L, Long.MIN_VALUE); + fetcher.emitRecord(13L, part2, 2L); + fetcher.emitRecord(14L, part2, 3L); + fetcher.emitRecord(15L, part2, 3L); processingTimeService.setCurrentTime(30); // this blocks until the periodic thread emitted the watermark
