This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch v4.0
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git

commit 4856934262e06bbd95535f95623880b8e7bf0b66
Author: Arvid Heise <[email protected]>
AuthorDate: Thu Apr 10 14:54:13 2025 +0200

    [FLINK-37644] Remove unused code and commons-collection
---
 .../streaming/connectors/kafka/KafkaTestBase.java  | 96 ----------------------
 .../connectors/kafka/KafkaTestEnvironment.java     |  6 --
 .../connectors/kafka/KafkaTestEnvironmentImpl.java | 10 ---
 3 files changed, 112 deletions(-)

diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index eb35391f..c21ed14c 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -33,7 +33,6 @@ import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
 
 import com.google.common.base.MoreObjects;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -48,17 +47,12 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Properties;
-import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 
-import static org.assertj.core.api.Assertions.fail;
-
 /**
  * The base for the Kafka tests. It brings up:
  *
@@ -278,96 +272,6 @@ public abstract class KafkaTestBase extends TestLogger {
         }
     }
 
-    /**
-     * We manually handle the timeout instead of using JUnit's timeout to 
return failure instead of
-     * timeout error. After timeout we assume that there are missing records 
and there is a bug, not
-     * that the test has run out of time.
-     */
-    public void assertAtLeastOnceForTopic(
-            Properties properties,
-            String topic,
-            int partition,
-            Set<Integer> expectedElements,
-            long timeoutMillis)
-            throws Exception {
-
-        long startMillis = System.currentTimeMillis();
-        Set<Integer> actualElements = new HashSet<>();
-
-        // until we timeout...
-        while (System.currentTimeMillis() < startMillis + timeoutMillis) {
-            properties.put(
-                    "key.deserializer",
-                    
"org.apache.kafka.common.serialization.IntegerDeserializer");
-            properties.put(
-                    "value.deserializer",
-                    
"org.apache.kafka.common.serialization.IntegerDeserializer");
-            // We need to set these two properties so that they are lower than 
request.timeout.ms.
-            // This is
-            // required for some old KafkaConsumer versions.
-            properties.put("session.timeout.ms", "2000");
-            properties.put("heartbeat.interval.ms", "500");
-
-            // query kafka for new records ...
-            Collection<ConsumerRecord<Integer, Integer>> records =
-                    kafkaServer.getAllRecordsFromTopic(properties, topic);
-
-            for (ConsumerRecord<Integer, Integer> record : records) {
-                actualElements.add(record.value());
-            }
-
-            // succeed if we got all expectedElements
-            if (actualElements.containsAll(expectedElements)) {
-                return;
-            }
-        }
-
-        fail(
-                String.format(
-                        "Expected to contain all of: <%s>, but was: <%s>",
-                        expectedElements, actualElements));
-    }
-
-    public void assertExactlyOnceForTopic(
-            Properties properties, String topic, List<Integer> 
expectedElements) {
-
-        List<Integer> actualElements = new ArrayList<>();
-
-        Properties consumerProperties = new Properties();
-        consumerProperties.putAll(properties);
-        consumerProperties.put(
-                "key.deserializer", 
"org.apache.kafka.common.serialization.IntegerDeserializer");
-        consumerProperties.put(
-                "value.deserializer", 
"org.apache.kafka.common.serialization.IntegerDeserializer");
-        consumerProperties.put("isolation.level", "read_committed");
-
-        // query kafka for new records ...
-        Collection<ConsumerRecord<byte[], byte[]>> records =
-                kafkaServer.getAllRecordsFromTopic(consumerProperties, topic);
-
-        for (ConsumerRecord<byte[], byte[]> record : records) {
-            actualElements.add(ByteBuffer.wrap(record.value()).getInt());
-        }
-
-        // succeed if we got all expectedElements
-        if (actualElements.equals(expectedElements)) {
-            return;
-        }
-
-        fail(
-                String.format(
-                        "Expected %s, but was: %s",
-                        formatElements(expectedElements), 
formatElements(actualElements)));
-    }
-
-    private String formatElements(List<Integer> elements) {
-        if (elements.size() > 50) {
-            return String.format("number of elements: <%s>", elements.size());
-        } else {
-            return String.format("elements: <%s>", elements);
-        }
-    }
-
     public static void setNumKafkaClusters(int size) {
         numKafkaClusters = size;
     }
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
index 42932f5d..77c40a81 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -17,9 +17,6 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-
-import java.util.Collection;
 import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
@@ -110,9 +107,6 @@ public abstract class KafkaTestEnvironment {
         return props;
     }
 
-    public abstract <K, V> Collection<ConsumerRecord<K, V>> 
getAllRecordsFromTopic(
-            Properties properties, String topic);
-
     // -- offset handlers
 
     /** Simple interface to commit and retrieve offsets. */
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 52d3b9dc..0a65e912 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -21,12 +21,10 @@ import 
org.apache.flink.connector.kafka.testutils.DockerImageVersions;
 import org.apache.flink.connector.kafka.testutils.KafkaUtil;
 import org.apache.flink.core.testutils.CommonTestUtils;
 
-import org.apache.commons.collections.list.UnmodifiableList;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.admin.TopicListing;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
@@ -42,7 +40,6 @@ import javax.annotation.Nullable;
 
 import java.time.Duration;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -220,13 +217,6 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
         return DockerImageVersions.KAFKA;
     }
 
-    @Override
-    @SuppressWarnings("unchecked")
-    public <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(
-            Properties properties, String topic) {
-        return 
UnmodifiableList.decorate(KafkaUtil.drainAllRecordsFromTopic(topic, 
properties));
-    }
-
     @Override
     public KafkaOffsetHandler createOffsetHandler() {
         return new KafkaOffsetHandlerImpl();

Reply via email to