This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
commit ffa11654b67885cf7beb823182e640b863162d72 Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org> AuthorDate: Wed Apr 5 15:08:39 2023 -0700 [hotfix] Refactor MockPartitionOffsetsRetriever as a common test utility --- .../kafka/table/KafkaDynamicTableFactoryTest.java | 72 +-------------- .../table/UpsertKafkaDynamicTableFactoryTest.java | 72 +-------------- .../testutils/MockPartitionOffsetsRetriever.java | 102 +++++++++++++++++++++ 3 files changed, 104 insertions(+), 142 deletions(-) diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java index a0b74a5b..7ab05035 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java @@ -46,6 +46,7 @@ import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanStartupMode; +import org.apache.flink.streaming.connectors.kafka.testutils.MockPartitionOffsetsRetriever; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.Column; @@ -87,7 +88,6 @@ import org.junit.jupiter.params.provider.ValueSource; import javax.annotation.Nullable; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -97,7 +97,6 @@ import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.function.Consumer; -import java.util.function.Function; import java.util.regex.Pattern; import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; @@ -577,75 +576,6 @@ public class KafkaDynamicTableFactoryTest { validator.accept(kafkaSource); } - private interface OffsetsRetriever - extends Function<Collection<TopicPartition>, Map<TopicPartition, Long>> {} - - private interface TimestampOffsetsRetriever - extends Function<Map<TopicPartition, Long>, Map<TopicPartition, OffsetAndTimestamp>> {} - - private static final class MockPartitionOffsetsRetriever - implements OffsetsInitializer.PartitionOffsetsRetriever { - - public static final OffsetsRetriever UNSUPPORTED_RETRIEVAL = - partitions -> { - throw new UnsupportedOperationException( - "The method was not supposed to be called"); - }; - private final OffsetsRetriever committedOffsets; - private final OffsetsRetriever endOffsets; - private final OffsetsRetriever beginningOffsets; - private final TimestampOffsetsRetriever offsetsForTimes; - - static MockPartitionOffsetsRetriever noInteractions() { - return new MockPartitionOffsetsRetriever( - UNSUPPORTED_RETRIEVAL, - UNSUPPORTED_RETRIEVAL, - UNSUPPORTED_RETRIEVAL, - partitions -> { - throw new UnsupportedOperationException( - "The method was not supposed to be called"); - }); - } - - static MockPartitionOffsetsRetriever timestampAndEnd( - TimestampOffsetsRetriever retriever, OffsetsRetriever endOffsets) { - return new MockPartitionOffsetsRetriever( - UNSUPPORTED_RETRIEVAL, endOffsets, UNSUPPORTED_RETRIEVAL, retriever); - } - - private MockPartitionOffsetsRetriever( - OffsetsRetriever committedOffsets, - OffsetsRetriever endOffsets, - OffsetsRetriever beginningOffsets, - TimestampOffsetsRetriever offsetsForTimes) { - this.committedOffsets = committedOffsets; - this.endOffsets = endOffsets; - this.beginningOffsets = beginningOffsets; - this.offsetsForTimes = offsetsForTimes; - } - - @Override - public Map<TopicPartition, Long> committedOffsets(Collection<TopicPartition> partitions) { - return committedOffsets.apply(partitions); - } - - @Override - public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) { - return endOffsets.apply(partitions); - } - - @Override - public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) { - return beginningOffsets.apply(partitions); - } - - @Override - public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes( - Map<TopicPartition, Long> timestampsToSearch) { - return offsetsForTimes.apply(timestampsToSearch); - } - } - @Test public void testTableSink() { final Map<String, String> modifiedOptions = diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java index 959c44cf..ba424f79 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java @@ -39,6 +39,7 @@ import org.apache.flink.streaming.api.operators.StreamOperatorFactory; import org.apache.flink.streaming.api.transformations.SourceTransformation; import org.apache.flink.streaming.connectors.kafka.config.BoundedMode; import org.apache.flink.streaming.connectors.kafka.config.StartupMode; +import org.apache.flink.streaming.connectors.kafka.testutils.MockPartitionOffsetsRetriever; import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.ValidationException; @@ -74,14 +75,12 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.Properties; import java.util.function.Consumer; -import java.util.function.Function; import static org.apache.flink.core.testutils.FlinkMatchers.containsCause; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanBoundedMode; @@ -819,73 +818,4 @@ public class UpsertKafkaDynamicTableFactoryTest extends TestLogger { final KafkaSource<?> kafkaSource = assertKafkaSource(provider); validator.accept(kafkaSource); } - - private interface OffsetsRetriever - extends Function<Collection<TopicPartition>, Map<TopicPartition, Long>> {} - - private interface TimestampOffsetsRetriever - extends Function<Map<TopicPartition, Long>, Map<TopicPartition, OffsetAndTimestamp>> {} - - private static final class MockPartitionOffsetsRetriever - implements OffsetsInitializer.PartitionOffsetsRetriever { - - public static final OffsetsRetriever UNSUPPORTED_RETRIEVAL = - partitions -> { - throw new UnsupportedOperationException( - "The method was not supposed to be called"); - }; - private final OffsetsRetriever committedOffsets; - private final OffsetsRetriever endOffsets; - private final OffsetsRetriever beginningOffsets; - private final TimestampOffsetsRetriever offsetsForTimes; - - static MockPartitionOffsetsRetriever noInteractions() { - return new MockPartitionOffsetsRetriever( - UNSUPPORTED_RETRIEVAL, - UNSUPPORTED_RETRIEVAL, - UNSUPPORTED_RETRIEVAL, - partitions -> { - throw new UnsupportedOperationException( - "The method was not supposed to be called"); - }); - } - - static MockPartitionOffsetsRetriever timestampAndEnd( - TimestampOffsetsRetriever retriever, OffsetsRetriever endOffsets) { - return new MockPartitionOffsetsRetriever( - UNSUPPORTED_RETRIEVAL, endOffsets, UNSUPPORTED_RETRIEVAL, retriever); - } - - private MockPartitionOffsetsRetriever( - OffsetsRetriever committedOffsets, - OffsetsRetriever endOffsets, - OffsetsRetriever beginningOffsets, - TimestampOffsetsRetriever offsetsForTimes) { - this.committedOffsets = committedOffsets; - this.endOffsets = endOffsets; - this.beginningOffsets = beginningOffsets; - this.offsetsForTimes = offsetsForTimes; - } - - @Override - public Map<TopicPartition, Long> committedOffsets(Collection<TopicPartition> partitions) { - return committedOffsets.apply(partitions); - } - - @Override - public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) { - return endOffsets.apply(partitions); - } - - @Override - public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) { - return beginningOffsets.apply(partitions); - } - - @Override - public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes( - Map<TopicPartition, Long> timestampsToSearch) { - return offsetsForTimes.apply(timestampsToSearch); - } - } } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockPartitionOffsetsRetriever.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockPartitionOffsetsRetriever.java new file mode 100644 index 00000000..175bddd6 --- /dev/null +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockPartitionOffsetsRetriever.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.testutils; + +import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; + +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; +import org.apache.kafka.common.TopicPartition; + +import java.util.Collection; +import java.util.Map; +import java.util.function.Function; + +/** Fake {@link OffsetsInitializer.PartitionOffsetsRetriever} for unit tests. */ +public final class MockPartitionOffsetsRetriever + implements OffsetsInitializer.PartitionOffsetsRetriever { + + /** Fake offsets retriever for a given set of topic partitions. */ + public interface OffsetsRetriever + extends Function<Collection<TopicPartition>, Map<TopicPartition, Long>> {} + + /** + * Fake offsets retrieve for a given set of topic partitions and their target timestamp + * position. + */ + public interface TimestampOffsetsRetriever + extends Function<Map<TopicPartition, Long>, Map<TopicPartition, OffsetAndTimestamp>> {} + + public static final OffsetsRetriever UNSUPPORTED_RETRIEVAL = + partitions -> { + throw new UnsupportedOperationException("The method was not supposed to be called"); + }; + private final OffsetsRetriever committedOffsets; + private final OffsetsRetriever endOffsets; + private final OffsetsRetriever beginningOffsets; + private final TimestampOffsetsRetriever offsetsForTimes; + + public static MockPartitionOffsetsRetriever noInteractions() { + return new MockPartitionOffsetsRetriever( + UNSUPPORTED_RETRIEVAL, + UNSUPPORTED_RETRIEVAL, + UNSUPPORTED_RETRIEVAL, + partitions -> { + throw new UnsupportedOperationException( + "The method was not supposed to be called"); + }); + } + + public static MockPartitionOffsetsRetriever timestampAndEnd( + TimestampOffsetsRetriever retriever, OffsetsRetriever endOffsets) { + return new MockPartitionOffsetsRetriever( + UNSUPPORTED_RETRIEVAL, endOffsets, UNSUPPORTED_RETRIEVAL, retriever); + } + + private MockPartitionOffsetsRetriever( + OffsetsRetriever committedOffsets, + OffsetsRetriever endOffsets, + OffsetsRetriever beginningOffsets, + TimestampOffsetsRetriever offsetsForTimes) { + this.committedOffsets = committedOffsets; + this.endOffsets = endOffsets; + this.beginningOffsets = beginningOffsets; + this.offsetsForTimes = offsetsForTimes; + } + + @Override + public Map<TopicPartition, Long> committedOffsets(Collection<TopicPartition> partitions) { + return committedOffsets.apply(partitions); + } + + @Override + public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) { + return endOffsets.apply(partitions); + } + + @Override + public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) { + return beginningOffsets.apply(partitions); + } + + @Override + public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes( + Map<TopicPartition, Long> timestampsToSearch) { + return offsetsForTimes.apply(timestampsToSearch); + } +}