This is an automated email from the ASF dual-hosted git repository. MartijnVisser pushed a commit to branch v4.0 in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
commit 9b30d771b1cc9344059c2e192e8a72d083440197 Author: Aleksandr Savonin <[email protected]> AuthorDate: Mon May 18 23:24:26 2026 +0200 [FLINK-39699][tests] Stabilize KafkaWriterFaultToleranceITCase exception-on-unavailable tests testFlush/testWrite/testCloseExceptionWhenKafkaUnavailable all rely on the producer still having undelivered work when KAFKA_CONTAINER.stop() takes effect. Under CI load the sender thread can ship and ack the buffered record before stop() returns, so the operation under test (flush/write/close) has nothing to fail on and the .rootCause() assertion fires with "Expecting actual not to be null" instead of seeing the expected NetworkException / TimeoutException. Drain a warm-up record before stopping the broker, then issue the real write while the broker is down. The producer's metadata is already cached so write() returns immediately; the sender fails to deliver (retries=0); the operation under test reliably surfaces the underlying exception. (cherry picked from commit 887d5941d5b6dee2044b55cac629f040336f7435) Generated-by: Claude Code (Opus 4.8) --- .../flink/connector/kafka/sink/KafkaWriterFaultToleranceITCase.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterFaultToleranceITCase.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterFaultToleranceITCase.java index 4a7d25c6..1d83cde6 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterFaultToleranceITCase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterFaultToleranceITCase.java @@ -61,10 +61,12 @@ public class KafkaWriterFaultToleranceITCase extends KafkaWriterTestBase { new SinkInitContext(metricGroup, timeService, null))) { writer.write(1, SINK_WRITER_CONTEXT); + writer.getCurrentProducer().flush(); KAFKA_CONTAINER.stop(); try { + writer.write(1, SINK_WRITER_CONTEXT); writer.getCurrentProducer().flush(); assertThatCode(() -> writer.write(1, SINK_WRITER_CONTEXT)) .rootCause() @@ -84,9 +86,11 @@ public class KafkaWriterFaultToleranceITCase extends KafkaWriterTestBase { DeliveryGuarantee.AT_LEAST_ONCE, new SinkInitContext(metricGroup, timeService, null))) { writer.write(1, SINK_WRITER_CONTEXT); + writer.flush(false); KAFKA_CONTAINER.stop(); try { + writer.write(1, SINK_WRITER_CONTEXT); assertThatCode(() -> writer.flush(false)) .rootCause() .isInstanceOfAny(NetworkException.class, TimeoutException.class); @@ -106,10 +110,12 @@ public class KafkaWriterFaultToleranceITCase extends KafkaWriterTestBase { new SinkInitContext(metricGroup, timeService, null)); writer.write(1, SINK_WRITER_CONTEXT); + writer.getCurrentProducer().flush(); KAFKA_CONTAINER.stop(); try { + writer.write(1, SINK_WRITER_CONTEXT); writer.getCurrentProducer().flush(); // closing producer resource throws exception first assertThatCode(() -> writer.close())
