This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
commit 39a1f2beb809ff00f180f711e110314b8ac6ac7b Author: Arvid Heise <[email protected]> AuthorDate: Thu Apr 10 14:54:13 2025 +0200 [FLINK-37644] Remove unused code and commons-collection --- .../streaming/connectors/kafka/KafkaTestBase.java | 96 ---------------------- .../connectors/kafka/KafkaTestEnvironment.java | 6 -- .../connectors/kafka/KafkaTestEnvironmentImpl.java | 10 --- 3 files changed, 112 deletions(-) diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java index eb35391f..c21ed14c 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java @@ -33,7 +33,6 @@ import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.TestLogger; import com.google.common.base.MoreObjects; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; @@ -48,17 +47,12 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; -import java.util.HashSet; import java.util.List; import java.util.Properties; -import java.util.Set; import java.util.concurrent.atomic.AtomicReference; -import static org.assertj.core.api.Assertions.fail; - /** * The base for the Kafka tests. It brings up: * @@ -278,96 +272,6 @@ public abstract class KafkaTestBase extends TestLogger { } } - /** - * We manually handle the timeout instead of using JUnit's timeout to return failure instead of - * timeout error. After timeout we assume that there are missing records and there is a bug, not - * that the test has run out of time. - */ - public void assertAtLeastOnceForTopic( - Properties properties, - String topic, - int partition, - Set<Integer> expectedElements, - long timeoutMillis) - throws Exception { - - long startMillis = System.currentTimeMillis(); - Set<Integer> actualElements = new HashSet<>(); - - // until we timeout... - while (System.currentTimeMillis() < startMillis + timeoutMillis) { - properties.put( - "key.deserializer", - "org.apache.kafka.common.serialization.IntegerDeserializer"); - properties.put( - "value.deserializer", - "org.apache.kafka.common.serialization.IntegerDeserializer"); - // We need to set these two properties so that they are lower than request.timeout.ms. - // This is - // required for some old KafkaConsumer versions. - properties.put("session.timeout.ms", "2000"); - properties.put("heartbeat.interval.ms", "500"); - - // query kafka for new records ... - Collection<ConsumerRecord<Integer, Integer>> records = - kafkaServer.getAllRecordsFromTopic(properties, topic); - - for (ConsumerRecord<Integer, Integer> record : records) { - actualElements.add(record.value()); - } - - // succeed if we got all expectedElements - if (actualElements.containsAll(expectedElements)) { - return; - } - } - - fail( - String.format( - "Expected to contain all of: <%s>, but was: <%s>", - expectedElements, actualElements)); - } - - public void assertExactlyOnceForTopic( - Properties properties, String topic, List<Integer> expectedElements) { - - List<Integer> actualElements = new ArrayList<>(); - - Properties consumerProperties = new Properties(); - consumerProperties.putAll(properties); - consumerProperties.put( - "key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); - consumerProperties.put( - "value.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); - consumerProperties.put("isolation.level", "read_committed"); - - // query kafka for new records ... - Collection<ConsumerRecord<byte[], byte[]>> records = - kafkaServer.getAllRecordsFromTopic(consumerProperties, topic); - - for (ConsumerRecord<byte[], byte[]> record : records) { - actualElements.add(ByteBuffer.wrap(record.value()).getInt()); - } - - // succeed if we got all expectedElements - if (actualElements.equals(expectedElements)) { - return; - } - - fail( - String.format( - "Expected %s, but was: %s", - formatElements(expectedElements), formatElements(actualElements))); - } - - private String formatElements(List<Integer> elements) { - if (elements.size() > 50) { - return String.format("number of elements: <%s>", elements.size()); - } else { - return String.format("elements: <%s>", elements); - } - } - public static void setNumKafkaClusters(int size) { numKafkaClusters = size; } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java index 42932f5d..77c40a81 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java @@ -17,9 +17,6 @@ package org.apache.flink.streaming.connectors.kafka; -import org.apache.kafka.clients.consumer.ConsumerRecord; - -import java.util.Collection; import java.util.Map; import java.util.Properties; import java.util.UUID; @@ -110,9 +107,6 @@ public abstract class KafkaTestEnvironment { return props; } - public abstract <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic( - Properties properties, String topic); - // -- offset handlers /** Simple interface to commit and retrieve offsets. */ diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index 52d3b9dc..0a65e912 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -21,12 +21,10 @@ import org.apache.flink.connector.kafka.testutils.DockerImageVersions; import org.apache.flink.connector.kafka.testutils.KafkaUtil; import org.apache.flink.core.testutils.CommonTestUtils; -import org.apache.commons.collections.list.UnmodifiableList; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.admin.TopicListing; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; @@ -42,7 +40,6 @@ import javax.annotation.Nullable; import java.time.Duration; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -220,13 +217,6 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { return DockerImageVersions.KAFKA; } - @Override - @SuppressWarnings("unchecked") - public <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic( - Properties properties, String topic) { - return UnmodifiableList.decorate(KafkaUtil.drainAllRecordsFromTopic(topic, properties)); - } - @Override public KafkaOffsetHandler createOffsetHandler() { return new KafkaOffsetHandlerImpl();
