This is an automated email from the ASF dual-hosted git repository.

hong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git


The following commit(s) were added to refs/heads/main by this push:
     new 6cfa13d  [FLINK-35815][Connector/Kinesis] Fix detection of recoverable 
exceptions for EFO operations
6cfa13d is described below

commit 6cfa13d64e090887552ad2a3d4f6a59b8d599b28
Author: Aleksandr Pilipenko <[email protected]>
AuthorDate: Sun Aug 4 16:51:31 2024 +0100

    [FLINK-35815][Connector/Kinesis] Fix detection of recoverable exceptions 
for EFO operations
---
 .../streaming/connectors/kinesis/util/AwsV2Util.java      | 11 ++++++++---
 .../streaming/connectors/kinesis/util/AwsV2UtilTest.java  | 15 ++++++++++++++-
 2 files changed, 22 insertions(+), 4 deletions(-)

diff --git 
a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java
 
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java
index aa62a65..f467f17 100644
--- 
a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java
+++ 
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.connectors.kinesis.util;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.ExceptionUtils;
 
 import software.amazon.awssdk.http.SdkHttpConfigurationOption;
 import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
@@ -74,8 +75,12 @@ public class AwsV2Util {
     }
 
     public static boolean isRecoverableException(Exception e) {
-        Throwable cause = e.getCause();
-        return cause instanceof LimitExceededException
-                || cause instanceof ProvisionedThroughputExceededException;
+        return ExceptionUtils.findThrowable(
+                        e,
+                        throwable ->
+                                throwable instanceof LimitExceededException
+                                        || throwable
+                                                instanceof 
ProvisionedThroughputExceededException)
+                .isPresent();
     }
 }
diff --git 
a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java
 
b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java
index 79cd746..bcfc079 100644
--- 
a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java
+++ 
b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kinesis.util;
 import org.junit.Test;
 import software.amazon.awssdk.http.SdkHttpConfigurationOption;
 import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
+import 
software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException;
 import software.amazon.awssdk.utils.AttributeMap;
 
 import java.time.Duration;
@@ -133,7 +134,19 @@ public class AwsV2UtilTest {
     }
 
     @Test
-    public void testIsRecoverableExceptionForRecoverable() {
+    public void testIsRecoverableExceptionForRecoverableLimitExceeded() {
+        Exception recoverable = LimitExceededException.builder().build();
+        assertThat(AwsV2Util.isRecoverableException(recoverable)).isTrue();
+    }
+
+    @Test
+    public void 
testIsRecoverableExceptionForRecoverableProvisionedThroughputExceeded() {
+        Exception recoverable = 
ProvisionedThroughputExceededException.builder().build();
+        assertThat(AwsV2Util.isRecoverableException(recoverable)).isTrue();
+    }
+
+    @Test
+    public void testIsRecoverableExceptionForRecoverableWrapped() {
         Exception recoverable = LimitExceededException.builder().build();
         assertThat(AwsV2Util.isRecoverableException(new 
ExecutionException(recoverable))).isTrue();
     }

Reply via email to