This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git


The following commit(s) were added to refs/heads/main by this push:
     new d166ee2  [FLINK-31492][Connectors/AWS] Add support for exception 
classification based on AWS error codes
d166ee2 is described below

commit d166ee24bdd2b238f1d909912ec1d038732ec1c4
Author: Samuel Siebenmann <siebe...@amazon.com>
AuthorDate: Wed Mar 22 14:19:12 2023 +0000

    [FLINK-31492][Connectors/AWS] Add support for exception classification 
based on AWS error codes
    
    - Add AWSExceptionClassifierUtil to create FatalExceptionClassifiers to 
classify AWSServiceExceptions
      based on AWSErrorDetails error code.
    - Add AWSExceptionHandler to provide improved semantics over 
FatalExceptionClassifier.
    - Move Firehose sink exception classifiers into their own class: 
AWSFirehoseExceptionClassifiers.
    - Add FatalExceptionClassifiers for FirehoseExceptions with errorCode 
"NotAuthorized" and "AccessDeniedException".
    - Update KinesisFirehoseSinkWriter to use new classifiers.
    - Update KinesisFirehoseSinkWriter to use AWSExceptionHandler for better 
readability.
---
 .../sink/throwable/AWSExceptionClassifierUtil.java |  68 +++++++++++++
 .../aws/sink/throwable/AWSExceptionHandler.java    |  55 ++++++++++
 .../throwable/AWSExceptionClassifierUtilTest.java  | 111 +++++++++++++++++++++
 .../sink/throwable/AWSExceptionHandlerTest.java    |  72 +++++++++++++
 .../sink/AWSFirehoseExceptionClassifiers.java      |  61 +++++++++++
 .../firehose/sink/KinesisFirehoseSinkWriter.java   |  70 ++++++-------
 .../sink/AWSFirehoseExceptionClassifiersTest.java  |  72 +++++++++++++
 7 files changed, 470 insertions(+), 39 deletions(-)

diff --git 
a/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/sink/throwable/AWSExceptionClassifierUtil.java
 
b/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/sink/throwable/AWSExceptionClassifierUtil.java
new file mode 100644
index 0000000..f772678
--- /dev/null
+++ 
b/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/sink/throwable/AWSExceptionClassifierUtil.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.aws.sink.throwable;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
+import org.apache.flink.util.ExceptionUtils;
+
+import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Function;
+
+/**
+ * Util class to create {@link FatalExceptionClassifier} to classify {@link 
AwsServiceException}
+ * based on {@link AwsErrorDetails#errorCode()}.
+ */
+@Internal
+public class AWSExceptionClassifierUtil {
+
+    /**
+     * Creates a {@link FatalExceptionClassifier} that classifies an exception 
as fatal if a given
+     * exception contains an {@link AwsServiceException} of type 
serviceExceptionType and {@link
+     * AwsErrorDetails#errorCode()} errorCode.
+     *
+     * @param serviceExceptionType The specific {@link AwsServiceException} to 
look for in the
+     *     exception.
+     * @param errorCode The {@link AwsErrorDetails#errorCode()} for the passed 
serviceExceptionType.
+     * @param mapper The exception mapper to be used by the returned {@link 
AWSExceptionHandler}.
+     * @return A {@link FatalExceptionClassifier} classifying based on 
exception type and error
+     *     code.
+     */
+    public static FatalExceptionClassifier withAWSServiceErrorCode(
+            Class<? extends AwsServiceException> serviceExceptionType,
+            String errorCode,
+            Function<Throwable, Exception> mapper) {
+        return new FatalExceptionClassifier(
+                (err) -> {
+                    Optional<? extends AwsServiceException> exceptionOptional =
+                            ExceptionUtils.findThrowable(err, 
serviceExceptionType);
+                    if (!exceptionOptional.isPresent()) {
+                        return false;
+                    }
+
+                    AwsServiceException exception = exceptionOptional.get();
+                    return exception.awsErrorDetails() != null
+                            && Objects.equals(errorCode, 
exception.awsErrorDetails().errorCode());
+                },
+                mapper);
+    }
+}
diff --git 
a/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/sink/throwable/AWSExceptionHandler.java
 
b/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/sink/throwable/AWSExceptionHandler.java
new file mode 100644
index 0000000..97671ba
--- /dev/null
+++ 
b/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/sink/throwable/AWSExceptionHandler.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.aws.sink.throwable;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
+
+import java.util.function.Consumer;
+
+/**
+ * Class to provide improved semantics over {@link 
FatalExceptionClassifier#isFatal(Throwable,
+ * Consumer)}. {@link FatalExceptionClassifier#isFatal(Throwable, Consumer)} 
returns `false` for a
+ * fatal exception and `true` for a non-fatal exception.
+ */
+@Internal
+public class AWSExceptionHandler {
+
+    private final FatalExceptionClassifier classifier;
+
+    private AWSExceptionHandler(FatalExceptionClassifier classifier) {
+        this.classifier = classifier;
+    }
+
+    public static AWSExceptionHandler withClassifier(FatalExceptionClassifier 
classifier) {
+        return new AWSExceptionHandler(classifier);
+    }
+
+    /**
+     * Passes a given {@link Throwable} t to a given {@link Consumer} consumer 
if the throwable is
+     * fatal. Returns `true` if the {@link Throwable} has been passed to the 
{@link Consumer} (i.e.
+     * it is fatal) and `false` otherwise.
+     *
+     * @param t a {@link Throwable}
+     * @param consumer a {@link Consumer} to call if the passed throwable t is 
fatal.
+     * @return `true` if t is fatal, `false` otherwise.
+     */
+    public boolean consumeIfFatal(Throwable t, Consumer<Exception> consumer) {
+        return !classifier.isFatal(t, consumer);
+    }
+}
diff --git 
a/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/sink/throwable/AWSExceptionClassifierUtilTest.java
 
b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/sink/throwable/AWSExceptionClassifierUtilTest.java
new file mode 100644
index 0000000..34b6fec
--- /dev/null
+++ 
b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/sink/throwable/AWSExceptionClassifierUtilTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.aws.sink.throwable;
+
+import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
+
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.services.sts.model.StsException;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Unit tests for {@link AWSExceptionClassifierUtil}. */
+public class AWSExceptionClassifierUtilTest {
+
+    @Test
+    public void 
shouldCreateFatalExceptionClassifierThatClassifiesAsFatalIfMatchingErrorCode() {
+        // given
+        AwsServiceException exception =
+                StsException.builder()
+                        .awsErrorDetails(
+                                AwsErrorDetails.builder()
+                                        .errorCode("NotAuthorizedException")
+                                        .build())
+                        .build();
+
+        FatalExceptionClassifier classifier =
+                AWSExceptionClassifierUtil.withAWSServiceErrorCode(
+                        StsException.class,
+                        "NotAuthorizedException",
+                        (err) -> new RuntimeException());
+
+        // when (FatalExceptionClassifier#isFatal returns false if exception 
is fatal)
+        boolean isFatal = !classifier.isFatal(exception, (err) -> {});
+
+        // then
+        assertTrue(isFatal);
+    }
+
+    @Test
+    public void
+            
shouldCreateFatalExceptionClassifierThatClassifiesAsNonFatalIfNotMatchingErrorCode()
 {
+        // given
+        AwsServiceException exception =
+                StsException.builder()
+                        .awsErrorDetails(
+                                
AwsErrorDetails.builder().errorCode("SomeOtherException").build())
+                        .build();
+
+        FatalExceptionClassifier classifier =
+                AWSExceptionClassifierUtil.withAWSServiceErrorCode(
+                        StsException.class,
+                        "NotAuthorizedException",
+                        (err) -> new RuntimeException());
+
+        // when (FatalExceptionClassifier#isFatal returns true if exception is 
non-fatal)
+        boolean isFatal = !classifier.isFatal(exception, (err) -> {});
+
+        // then
+        assertFalse(isFatal);
+    }
+
+    @Test
+    public void 
shouldCreateFatalExceptionClassifierThatAppliesThrowableMapper() {
+        // given
+        AwsServiceException exception =
+                StsException.builder()
+                        .awsErrorDetails(
+                                AwsErrorDetails.builder()
+                                        .errorCode("NotAuthorizedException")
+                                        .build())
+                        .build();
+
+        Exception mappedException =
+                new RuntimeException(
+                        
"shouldCreateFatalExceptionClassifierThatAppliesThrowableMapper");
+        FatalExceptionClassifier classifier =
+                AWSExceptionClassifierUtil.withAWSServiceErrorCode(
+                        StsException.class, "NotAuthorizedException", (err) -> 
mappedException);
+
+        Set<Exception> consumedExceptions = new HashSet<>();
+
+        // when
+        classifier.isFatal(exception, consumedExceptions::add);
+
+        // then mappedException has been consumed
+        assertEquals(1, consumedExceptions.size());
+        assertTrue(consumedExceptions.contains(mappedException));
+    }
+}
diff --git 
a/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/sink/throwable/AWSExceptionHandlerTest.java
 
b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/sink/throwable/AWSExceptionHandlerTest.java
new file mode 100644
index 0000000..c53cd37
--- /dev/null
+++ 
b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/sink/throwable/AWSExceptionHandlerTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.aws.sink.throwable;
+
+import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Unit tests for {@link AWSExceptionHandler}. */
+public class AWSExceptionHandlerTest {
+
+    private final RuntimeException mappedException =
+            new RuntimeException("AWSExceptionHandlerTest");
+
+    private final AWSExceptionHandler exceptionHandler =
+            AWSExceptionHandler.withClassifier(
+                    FatalExceptionClassifier.withRootCauseOfType(
+                            UnsupportedOperationException.class, (err) -> 
mappedException));
+
+    @Test
+    public void shouldReturnTrueIfFatal() {
+        assertTrue(
+                exceptionHandler.consumeIfFatal(new 
UnsupportedOperationException(), (err) -> {}));
+    }
+
+    @Test
+    public void shouldReturnFalseIfNonFatal() {
+        assertFalse(exceptionHandler.consumeIfFatal(new 
IndexOutOfBoundsException(), (err) -> {}));
+    }
+
+    @Test
+    public void shouldConsumeMappedExceptionIfFatal() {
+        Set<Exception> consumedExceptions = new HashSet<>();
+        assertTrue(
+                exceptionHandler.consumeIfFatal(
+                        new UnsupportedOperationException(), 
consumedExceptions::add));
+
+        assertEquals(1, consumedExceptions.size());
+        assertTrue(consumedExceptions.contains(mappedException));
+    }
+
+    @Test
+    public void shouldNotConsumeMappedExceptionIfNonFatal() {
+        assertFalse(
+                exceptionHandler.consumeIfFatal(
+                        new IndexOutOfBoundsException(),
+                        // consumer should not be called
+                        (err) -> assertTrue(false)));
+    }
+}
diff --git 
a/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/AWSFirehoseExceptionClassifiers.java
 
b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/AWSFirehoseExceptionClassifiers.java
new file mode 100644
index 0000000..80fe36d
--- /dev/null
+++ 
b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/AWSFirehoseExceptionClassifiers.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.connector.aws.sink.throwable.AWSExceptionClassifierUtil;
+import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
+
+import software.amazon.awssdk.services.firehose.model.FirehoseException;
+import 
software.amazon.awssdk.services.firehose.model.ResourceNotFoundException;
+
+/**
+ * Class containing set of {@link FatalExceptionClassifier} for {@link
+ * software.amazon.awssdk.services.firehose.model.FirehoseException}.
+ */
+@Internal
+public class AWSFirehoseExceptionClassifiers {
+
+    public static FatalExceptionClassifier 
getNotAuthorizedExceptionClassifier() {
+        return AWSExceptionClassifierUtil.withAWSServiceErrorCode(
+                FirehoseException.class,
+                "NotAuthorized",
+                err ->
+                        new KinesisFirehoseException(
+                                "Encountered non-recoverable exception: 
NotAuthorized", err));
+    }
+
+    public static FatalExceptionClassifier 
getAccessDeniedExceptionClassifier() {
+        return AWSExceptionClassifierUtil.withAWSServiceErrorCode(
+                FirehoseException.class,
+                "AccessDeniedException",
+                err ->
+                        new KinesisFirehoseException(
+                                "Encountered non-recoverable exception: 
AccessDeniedException",
+                                err));
+    }
+
+    public static FatalExceptionClassifier 
getResourceNotFoundExceptionClassifier() {
+        return FatalExceptionClassifier.withRootCauseOfType(
+                ResourceNotFoundException.class,
+                err ->
+                        new KinesisFirehoseException(
+                                "Encountered non-recoverable exception 
relating to not being able to find the specified resources",
+                                err));
+    }
+}
diff --git 
a/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java
 
b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java
index e24c694..15cd799 100644
--- 
a/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java
+++ 
b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java
@@ -19,6 +19,7 @@ package org.apache.flink.connector.firehose.sink;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler;
 import org.apache.flink.connector.aws.util.AWSClientUtil;
 import org.apache.flink.connector.aws.util.AWSGeneralUtil;
 import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
@@ -37,7 +38,6 @@ import 
software.amazon.awssdk.services.firehose.model.PutRecordBatchRequest;
 import software.amazon.awssdk.services.firehose.model.PutRecordBatchResponse;
 import 
software.amazon.awssdk.services.firehose.model.PutRecordBatchResponseEntry;
 import software.amazon.awssdk.services.firehose.model.Record;
-import 
software.amazon.awssdk.services.firehose.model.ResourceNotFoundException;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -81,20 +81,16 @@ class KinesisFirehoseSinkWriter<InputT> extends 
AsyncSinkWriter<InputT, Record>
                 
KinesisFirehoseConfigConstants.FIREHOSE_CLIENT_USER_AGENT_PREFIX);
     }
 
-    private static final FatalExceptionClassifier 
RESOURCE_NOT_FOUND_EXCEPTION_CLASSIFIER =
-            FatalExceptionClassifier.withRootCauseOfType(
-                    ResourceNotFoundException.class,
-                    err ->
-                            new KinesisFirehoseException(
-                                    "Encountered non-recoverable exception 
relating to not being able to find the specified resources",
-                                    err));
-
-    private static final FatalExceptionClassifier 
FIREHOSE_FATAL_EXCEPTION_CLASSIFIER =
-            FatalExceptionClassifier.createChain(
-                    getInterruptedExceptionClassifier(),
-                    getInvalidCredentialsExceptionClassifier(),
-                    RESOURCE_NOT_FOUND_EXCEPTION_CLASSIFIER,
-                    getSdkClientMisconfiguredExceptionClassifier());
+    private static final AWSExceptionHandler FIREHOSE_EXCEPTION_HANDLER =
+            AWSExceptionHandler.withClassifier(
+                    FatalExceptionClassifier.createChain(
+                            getInterruptedExceptionClassifier(),
+                            getInvalidCredentialsExceptionClassifier(),
+                            AWSFirehoseExceptionClassifiers
+                                    .getResourceNotFoundExceptionClassifier(),
+                            
AWSFirehoseExceptionClassifiers.getAccessDeniedExceptionClassifier(),
+                            
AWSFirehoseExceptionClassifiers.getNotAuthorizedExceptionClassifier(),
+                            getSdkClientMisconfiguredExceptionClassifier()));
 
     private final Counter numRecordsOutErrorsCounter;
 
@@ -210,33 +206,42 @@ class KinesisFirehoseSinkWriter<InputT> extends 
AsyncSinkWriter<InputT, Record>
 
     private void handleFullyFailedRequest(
             Throwable err, List<Record> requestEntries, Consumer<List<Record>> 
requestResult) {
-        LOG.debug(
+
+        numRecordsOutErrorsCounter.inc(requestEntries.size());
+        boolean isFatal = FIREHOSE_EXCEPTION_HANDLER.consumeIfFatal(err, 
getFatalExceptionCons());
+        if (isFatal) {
+            return;
+        }
+
+        if (failOnError) {
+            getFatalExceptionCons()
+                    .accept(new 
KinesisFirehoseException.KinesisFirehoseFailFastException(err));
+            return;
+        }
+
+        LOG.warn(
                 "KDF Sink failed to write and will retry {} entries to KDF 
first request was {}",
                 requestEntries.size(),
                 requestEntries.get(0).toString(),
                 err);
-        numRecordsOutErrorsCounter.inc(requestEntries.size());
-
-        if (isRetryable(err)) {
-            requestResult.accept(requestEntries);
-        }
+        requestResult.accept(requestEntries);
     }
 
     private void handlePartiallyFailedRequest(
             PutRecordBatchResponse response,
             List<Record> requestEntries,
             Consumer<List<Record>> requestResult) {
-        LOG.debug(
-                "KDF Sink failed to write and will retry {} entries to KDF 
first request was {}",
-                requestEntries.size(),
-                requestEntries.get(0).toString());
         numRecordsOutErrorsCounter.inc(response.failedPutCount());
-
         if (failOnError) {
             getFatalExceptionCons()
                     .accept(new 
KinesisFirehoseException.KinesisFirehoseFailFastException());
             return;
         }
+
+        LOG.debug(
+                "KDF Sink failed to write and will retry {} entries to KDF 
first request was {}",
+                requestEntries.size(),
+                requestEntries.get(0).toString());
         List<Record> failedRequestEntries = new 
ArrayList<>(response.failedPutCount());
         List<PutRecordBatchResponseEntry> records = 
response.requestResponses();
 
@@ -248,17 +253,4 @@ class KinesisFirehoseSinkWriter<InputT> extends 
AsyncSinkWriter<InputT, Record>
 
         requestResult.accept(failedRequestEntries);
     }
-
-    private boolean isRetryable(Throwable err) {
-        if (!FIREHOSE_FATAL_EXCEPTION_CLASSIFIER.isFatal(err, 
getFatalExceptionCons())) {
-            return false;
-        }
-        if (failOnError) {
-            getFatalExceptionCons()
-                    .accept(new 
KinesisFirehoseException.KinesisFirehoseFailFastException(err));
-            return false;
-        }
-
-        return true;
-    }
 }
diff --git 
a/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/AWSFirehoseExceptionClassifiersTest.java
 
b/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/AWSFirehoseExceptionClassifiersTest.java
new file mode 100644
index 0000000..791021e
--- /dev/null
+++ 
b/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/AWSFirehoseExceptionClassifiersTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
+
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.services.firehose.model.FirehoseException;
+import 
software.amazon.awssdk.services.firehose.model.ResourceNotFoundException;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+/** Unit tests for {@link AWSFirehoseExceptionClassifiers}. */
+public class AWSFirehoseExceptionClassifiersTest {
+
+    private final FatalExceptionClassifier classifier =
+            FatalExceptionClassifier.createChain(
+                    
AWSFirehoseExceptionClassifiers.getAccessDeniedExceptionClassifier(),
+                    
AWSFirehoseExceptionClassifiers.getNotAuthorizedExceptionClassifier(),
+                    
AWSFirehoseExceptionClassifiers.getResourceNotFoundExceptionClassifier());
+
+    @Test
+    public void shouldClassifyNotAuthorizedAsFatal() {
+        AwsServiceException firehoseException =
+                FirehoseException.builder()
+                        .awsErrorDetails(
+                                
AwsErrorDetails.builder().errorCode("NotAuthorized").build())
+                        .build();
+
+        // isFatal returns `true` if an exception is non-fatal
+        assertFalse(classifier.isFatal(firehoseException, ex -> {}));
+    }
+
+    @Test
+    public void shouldClassifyAccessDeniedExceptionAsFatal() {
+        AwsServiceException firehoseException =
+                FirehoseException.builder()
+                        .awsErrorDetails(
+                                AwsErrorDetails.builder()
+                                        .errorCode("AccessDeniedException")
+                                        .build())
+                        .build();
+
+        // isFatal returns `true` if an exception is non-fatal
+        assertFalse(classifier.isFatal(firehoseException, ex -> {}));
+    }
+
+    @Test
+    public void shouldClassifyResourceNotFoundAsFatal() {
+        AwsServiceException firehoseException = 
ResourceNotFoundException.builder().build();
+
+        // isFatal returns `true` if an exception is non-fatal
+        assertFalse(classifier.isFatal(firehoseException, ex -> {}));
+    }
+}

Reply via email to