This is an automated email from the ASF dual-hosted git repository.
hong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git
The following commit(s) were added to refs/heads/main by this push:
new 6cfa13d [FLINK-35815][Connector/Kinesis] Fix detection of recoverable
exceptions for EFO operations
6cfa13d is described below
commit 6cfa13d64e090887552ad2a3d4f6a59b8d599b28
Author: Aleksandr Pilipenko <[email protected]>
AuthorDate: Sun Aug 4 16:51:31 2024 +0100
[FLINK-35815][Connector/Kinesis] Fix detection of recoverable exceptions
for EFO operations
---
.../streaming/connectors/kinesis/util/AwsV2Util.java | 11 ++++++++---
.../streaming/connectors/kinesis/util/AwsV2UtilTest.java | 15 ++++++++++++++-
2 files changed, 22 insertions(+), 4 deletions(-)
diff --git
a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java
index aa62a65..f467f17 100644
---
a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java
+++
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.connectors.kinesis.util;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.ExceptionUtils;
import software.amazon.awssdk.http.SdkHttpConfigurationOption;
import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
@@ -74,8 +75,12 @@ public class AwsV2Util {
}
public static boolean isRecoverableException(Exception e) {
- Throwable cause = e.getCause();
- return cause instanceof LimitExceededException
- || cause instanceof ProvisionedThroughputExceededException;
+ return ExceptionUtils.findThrowable(
+ e,
+ throwable ->
+ throwable instanceof LimitExceededException
+ || throwable
+ instanceof
ProvisionedThroughputExceededException)
+ .isPresent();
}
}
diff --git
a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java
b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java
index 79cd746..bcfc079 100644
---
a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java
+++
b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kinesis.util;
import org.junit.Test;
import software.amazon.awssdk.http.SdkHttpConfigurationOption;
import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
+import
software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException;
import software.amazon.awssdk.utils.AttributeMap;
import java.time.Duration;
@@ -133,7 +134,19 @@ public class AwsV2UtilTest {
}
@Test
- public void testIsRecoverableExceptionForRecoverable() {
+ public void testIsRecoverableExceptionForRecoverableLimitExceeded() {
+ Exception recoverable = LimitExceededException.builder().build();
+ assertThat(AwsV2Util.isRecoverableException(recoverable)).isTrue();
+ }
+
+ @Test
+ public void
testIsRecoverableExceptionForRecoverableProvisionedThroughputExceeded() {
+ Exception recoverable =
ProvisionedThroughputExceededException.builder().build();
+ assertThat(AwsV2Util.isRecoverableException(recoverable)).isTrue();
+ }
+
+ @Test
+ public void testIsRecoverableExceptionForRecoverableWrapped() {
Exception recoverable = LimitExceededException.builder().build();
assertThat(AwsV2Util.isRecoverableException(new
ExecutionException(recoverable))).isTrue();
}