This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git

commit 2b199b0cc2ee84c10558b12b30094a6e37dceafe
Author: Arvid Heise <[email protected]>
AuthorDate: Tue Apr 8 08:40:03 2025 +0200

    [hotfix] Harden KafkaWriterFaultToleranceITCase
    
    Make sure that TimeoutExceptions are also valid instead of 
NetworkExceptions.
---
 .../connector/kafka/sink/KafkaWriterFaultToleranceITCase.java | 11 +++++------
 1 file changed, 5 insertions(+), 6 deletions(-)

diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterFaultToleranceITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterFaultToleranceITCase.java
index a3a9bc9c..4a7d25c6 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterFaultToleranceITCase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterFaultToleranceITCase.java
@@ -68,10 +68,7 @@ public class KafkaWriterFaultToleranceITCase extends 
KafkaWriterTestBase {
                 writer.getCurrentProducer().flush();
                 assertThatCode(() -> writer.write(1, SINK_WRITER_CONTEXT))
                         .rootCause()
-                        .matches(
-                                e ->
-                                        e instanceof NetworkException
-                                                || e instanceof 
TimeoutException);
+                        .isInstanceOfAny(NetworkException.class, 
TimeoutException.class);
             } finally {
                 KAFKA_CONTAINER.start();
             }
@@ -91,7 +88,8 @@ public class KafkaWriterFaultToleranceITCase extends 
KafkaWriterTestBase {
             KAFKA_CONTAINER.stop();
             try {
                 assertThatCode(() -> writer.flush(false))
-                        .hasRootCauseExactlyInstanceOf(NetworkException.class);
+                        .rootCause()
+                        .isInstanceOfAny(NetworkException.class, 
TimeoutException.class);
             } finally {
                 KAFKA_CONTAINER.start();
             }
@@ -115,7 +113,8 @@ public class KafkaWriterFaultToleranceITCase extends 
KafkaWriterTestBase {
             writer.getCurrentProducer().flush();
             // closing producer resource throws exception first
             assertThatCode(() -> writer.close())
-                    .hasRootCauseExactlyInstanceOf(NetworkException.class);
+                    .rootCause()
+                    .isInstanceOfAny(NetworkException.class, 
TimeoutException.class);
         } catch (Exception e) {
             writer.close();
             throw e;

Reply via email to