This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7c6a4aae336 [fix][test] Fixed Nondeterministic Assertions in
KafkaAbstractSinkTest (#24877)
7c6a4aae336 is described below
commit 7c6a4aae336a631e0b88998cf54c656c2f193ba4
Author: Lucas Eby <[email protected]>
AuthorDate: Mon Oct 27 04:09:14 2025 -0500
[fix][test] Fixed Nondeterministic Assertions in KafkaAbstractSinkTest
(#24877)
---
.../io/kafka/sink/KafkaAbstractSinkTest.java | 88 +++++++++++++++++-----
1 file changed, 68 insertions(+), 20 deletions(-)
diff --git
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
index c0c0a9a7172..f8c5b1df4c4 100644
---
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
+++
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
@@ -26,9 +26,12 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.function.Function;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.security.auth.SecurityProtocol;
@@ -65,7 +68,7 @@ public class KafkaAbstractSinkTest {
} catch (Throwable e) {
if (expectedType.isInstance(e)) {
T ex = expectedType.cast(e);
- assertEquals(expectedMessage, ex.getMessage());
+ assertEquals(ex.getMessage(), expectedMessage);
return;
}
throw new AssertionError("Unexpected exception type, expected " +
expectedType.getSimpleName()
@@ -74,10 +77,24 @@ public class KafkaAbstractSinkTest {
throw new AssertionError("Expected exception");
}
+ /**
+ * Creates a valid Kafka Sink configuration that is used by multiple test
cases.
+ *
+ * @return a map containing all required Kafka sink configuration fields
+ */
+ private static Map<String, Object> validConfig() {
+ Map<String, Object> map = new HashMap<>();
+ map.put("bootstrapServers", "localhost:6667");
+ map.put("acks", "1");
+ map.put("topic", "topic_2");
+ map.put("batchSize", "16384");
+ map.put("maxRequestSize", "1048576");
+ return map;
+ }
+
@Test
public void testInvalidConfigWillThrownException() throws Exception {
KafkaAbstractSink<String, byte[]> sink = new DummySink();
- Map<String, Object> config = new HashMap<>();
SinkContext sc = new SinkContext() {
@Override
public int getInstanceId() {
@@ -189,7 +206,7 @@ public class KafkaAbstractSinkTest {
}
};
- ThrowingRunnable openAndClose = ()->{
+ Function<Map<String, Object>, ThrowingRunnable> runWith = config -> ()
-> {
try {
sink.open(config, sc);
fail();
@@ -197,23 +214,54 @@ public class KafkaAbstractSinkTest {
sink.close();
}
};
- expectThrows(IllegalArgumentException.class, "bootstrapServers cannot
be null", openAndClose);
- config.put("bootstrapServers", "localhost:6667");
- expectThrows(IllegalArgumentException.class, "acks cannot be null",
openAndClose);
- config.put("acks", "1");
- expectThrows(IllegalArgumentException.class, "topic cannot be null",
openAndClose);
- config.put("topic", "topic_2");
- config.put("batchSize", "-1");
- expectThrows(IllegalArgumentException.class, "Invalid Kafka Producer
batchSize : -1", openAndClose);
- config.put("batchSize", "16384");
- config.put("maxRequestSize", "-1");
- expectThrows(IllegalArgumentException.class, "Invalid Kafka Producer
maxRequestSize : -1", openAndClose);
- config.put("maxRequestSize", "1048576");
- config.put("acks", "none");
- expectThrows(ConfigException.class,
- "Invalid value none for configuration acks: String must be one
of: all, -1, 0, 1",
- openAndClose);
- config.put("acks", "1");
+
+ // Table of test cases for key removal and modification tests
+ record Case(
+ Consumer<Map<String, Object>> mutate,
+ Class<? extends Exception> expectedType,
+ String expectedMessage
+ ) {}
+
+ List<Case> cases = List.of(
+ // Missing bootstrapServers
+ new Case(config -> config.remove("bootstrapServers"),
+ IllegalArgumentException.class,
+ "bootstrapServers cannot be null"),
+
+ // Missing acks
+ new Case(config -> config.remove("acks"),
+ IllegalArgumentException.class,
+ "acks cannot be null"),
+
+ // Missing topic
+ new Case(config -> config.remove("topic"),
+ IllegalArgumentException.class,
+ "topic cannot be null"),
+
+ // Bad batchSize
+ new Case(config -> config.put("batchSize", "-1"),
+ IllegalArgumentException.class,
+ "Invalid Kafka Producer batchSize : -1"),
+
+ // Bad maxRequestSize
+ new Case(config -> config.put("maxRequestSize", "-1"),
+ IllegalArgumentException.class,
+ "Invalid Kafka Producer maxRequestSize : -1"),
+
+ // Invalid acks value
+ new Case(config -> config.put("acks", "none"),
+ ConfigException.class,
+ "Invalid value none for configuration acks: String
must be one of: all, -1, 0, 1")
+ );
+
+ for (Case currCase : cases) {
+ var config = validConfig(); // set fresh, valid, baseline each time
+ currCase.mutate.accept(config); // remove or change one field
+ expectThrows(currCase.expectedType, currCase.expectedMessage,
runWith.apply(config));
+ }
+
+ // Finally verify a valid config passes cleanly
+ var config = validConfig();
sink.open(config, sc);
sink.close();
}