This is an automated email from the ASF dual-hosted git repository. dannycranmer pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git
The following commit(s) were added to refs/heads/main by this push: new d166ee2 [FLINK-31492][Connectors/AWS] Add support for exception classification based on AWS error codes d166ee2 is described below commit d166ee24bdd2b238f1d909912ec1d038732ec1c4 Author: Samuel Siebenmann <siebe...@amazon.com> AuthorDate: Wed Mar 22 14:19:12 2023 +0000 [FLINK-31492][Connectors/AWS] Add support for exception classification based on AWS error codes - Add AWSExceptionClassifierUtil to create FatalExceptionClassifiers to classify AWSServiceExceptions based on AWSErrorDetails error code. - Add AWSExceptionHandler to provide improved semantics over FatalExceptionClassifier. - Move Firehose sink exception classifiers into their own class: AWSFirehoseExceptionClassifiers. - Add FatalExceptionClassifiers for FirehoseExceptions with errorCode "NotAuthorized" and "AccessDeniedException". - Update KinesisFirehoseSinkWriter to use new classifiers. - Update KinesisFirehoseSinkWriter to use AWSExceptionHandler for better readability. --- .../sink/throwable/AWSExceptionClassifierUtil.java | 68 +++++++++++++ .../aws/sink/throwable/AWSExceptionHandler.java | 55 ++++++++++ .../throwable/AWSExceptionClassifierUtilTest.java | 111 +++++++++++++++++++++ .../sink/throwable/AWSExceptionHandlerTest.java | 72 +++++++++++++ .../sink/AWSFirehoseExceptionClassifiers.java | 61 +++++++++++ .../firehose/sink/KinesisFirehoseSinkWriter.java | 70 ++++++------- .../sink/AWSFirehoseExceptionClassifiersTest.java | 72 +++++++++++++ 7 files changed, 470 insertions(+), 39 deletions(-) diff --git a/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/sink/throwable/AWSExceptionClassifierUtil.java b/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/sink/throwable/AWSExceptionClassifierUtil.java new file mode 100644 index 0000000..f772678 --- /dev/null +++ b/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/sink/throwable/AWSExceptionClassifierUtil.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.aws.sink.throwable; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier; +import org.apache.flink.util.ExceptionUtils; + +import software.amazon.awssdk.awscore.exception.AwsErrorDetails; +import software.amazon.awssdk.awscore.exception.AwsServiceException; + +import java.util.Objects; +import java.util.Optional; +import java.util.function.Function; + +/** + * Util class to create {@link FatalExceptionClassifier} to classify {@link AwsServiceException} + * based on {@link AwsErrorDetails#errorCode()}. + */ +@Internal +public class AWSExceptionClassifierUtil { + + /** + * Creates a {@link FatalExceptionClassifier} that classifies an exception as fatal if a given + * exception contains an {@link AwsServiceException} of type serviceExceptionType and {@link + * AwsErrorDetails#errorCode()} errorCode. + * + * @param serviceExceptionType The specific {@link AwsServiceException} to look for in the + * exception. + * @param errorCode The {@link AwsErrorDetails#errorCode()} for the passed serviceExceptionType. + * @param mapper The exception mapper to be used by the returned {@link AWSExceptionHandler}. + * @return A {@link FatalExceptionClassifier} classifying based on exception type and error + * code. + */ + public static FatalExceptionClassifier withAWSServiceErrorCode( + Class<? extends AwsServiceException> serviceExceptionType, + String errorCode, + Function<Throwable, Exception> mapper) { + return new FatalExceptionClassifier( + (err) -> { + Optional<? extends AwsServiceException> exceptionOptional = + ExceptionUtils.findThrowable(err, serviceExceptionType); + if (!exceptionOptional.isPresent()) { + return false; + } + + AwsServiceException exception = exceptionOptional.get(); + return exception.awsErrorDetails() != null + && Objects.equals(errorCode, exception.awsErrorDetails().errorCode()); + }, + mapper); + } +} diff --git a/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/sink/throwable/AWSExceptionHandler.java b/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/sink/throwable/AWSExceptionHandler.java new file mode 100644 index 0000000..97671ba --- /dev/null +++ b/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/sink/throwable/AWSExceptionHandler.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.aws.sink.throwable; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier; + +import java.util.function.Consumer; + +/** + * Class to provide improved semantics over {@link FatalExceptionClassifier#isFatal(Throwable, + * Consumer)}. {@link FatalExceptionClassifier#isFatal(Throwable, Consumer)} returns `false` for a + * fatal exception and `true` for a non-fatal exception. + */ +@Internal +public class AWSExceptionHandler { + + private final FatalExceptionClassifier classifier; + + private AWSExceptionHandler(FatalExceptionClassifier classifier) { + this.classifier = classifier; + } + + public static AWSExceptionHandler withClassifier(FatalExceptionClassifier classifier) { + return new AWSExceptionHandler(classifier); + } + + /** + * Passes a given {@link Throwable} t to a given {@link Consumer} consumer if the throwable is + * fatal. Returns `true` if the {@link Throwable} has been passed to the {@link Consumer} (i.e. + * it is fatal) and `false` otherwise. + * + * @param t a {@link Throwable} + * @param consumer a {@link Consumer} to call if the passed throwable t is fatal. + * @return `true` if t is fatal, `false` otherwise. + */ + public boolean consumeIfFatal(Throwable t, Consumer<Exception> consumer) { + return !classifier.isFatal(t, consumer); + } +} diff --git a/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/sink/throwable/AWSExceptionClassifierUtilTest.java b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/sink/throwable/AWSExceptionClassifierUtilTest.java new file mode 100644 index 0000000..34b6fec --- /dev/null +++ b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/sink/throwable/AWSExceptionClassifierUtilTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.aws.sink.throwable; + +import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier; + +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.awscore.exception.AwsErrorDetails; +import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.services.sts.model.StsException; + +import java.util.HashSet; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** Unit tests for {@link AWSExceptionClassifierUtil}. */ +public class AWSExceptionClassifierUtilTest { + + @Test + public void shouldCreateFatalExceptionClassifierThatClassifiesAsFatalIfMatchingErrorCode() { + // given + AwsServiceException exception = + StsException.builder() + .awsErrorDetails( + AwsErrorDetails.builder() + .errorCode("NotAuthorizedException") + .build()) + .build(); + + FatalExceptionClassifier classifier = + AWSExceptionClassifierUtil.withAWSServiceErrorCode( + StsException.class, + "NotAuthorizedException", + (err) -> new RuntimeException()); + + // when (FatalExceptionClassifier#isFatal returns false if exception is fatal) + boolean isFatal = !classifier.isFatal(exception, (err) -> {}); + + // then + assertTrue(isFatal); + } + + @Test + public void + shouldCreateFatalExceptionClassifierThatClassifiesAsNonFatalIfNotMatchingErrorCode() { + // given + AwsServiceException exception = + StsException.builder() + .awsErrorDetails( + AwsErrorDetails.builder().errorCode("SomeOtherException").build()) + .build(); + + FatalExceptionClassifier classifier = + AWSExceptionClassifierUtil.withAWSServiceErrorCode( + StsException.class, + "NotAuthorizedException", + (err) -> new RuntimeException()); + + // when (FatalExceptionClassifier#isFatal returns true if exception is non-fatal) + boolean isFatal = !classifier.isFatal(exception, (err) -> {}); + + // then + assertFalse(isFatal); + } + + @Test + public void shouldCreateFatalExceptionClassifierThatAppliesThrowableMapper() { + // given + AwsServiceException exception = + StsException.builder() + .awsErrorDetails( + AwsErrorDetails.builder() + .errorCode("NotAuthorizedException") + .build()) + .build(); + + Exception mappedException = + new RuntimeException( + "shouldCreateFatalExceptionClassifierThatAppliesThrowableMapper"); + FatalExceptionClassifier classifier = + AWSExceptionClassifierUtil.withAWSServiceErrorCode( + StsException.class, "NotAuthorizedException", (err) -> mappedException); + + Set<Exception> consumedExceptions = new HashSet<>(); + + // when + classifier.isFatal(exception, consumedExceptions::add); + + // then mappedException has been consumed + assertEquals(1, consumedExceptions.size()); + assertTrue(consumedExceptions.contains(mappedException)); + } +} diff --git a/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/sink/throwable/AWSExceptionHandlerTest.java b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/sink/throwable/AWSExceptionHandlerTest.java new file mode 100644 index 0000000..c53cd37 --- /dev/null +++ b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/sink/throwable/AWSExceptionHandlerTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.aws.sink.throwable; + +import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier; + +import org.junit.jupiter.api.Test; + +import java.util.HashSet; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** Unit tests for {@link AWSExceptionHandler}. */ +public class AWSExceptionHandlerTest { + + private final RuntimeException mappedException = + new RuntimeException("AWSExceptionHandlerTest"); + + private final AWSExceptionHandler exceptionHandler = + AWSExceptionHandler.withClassifier( + FatalExceptionClassifier.withRootCauseOfType( + UnsupportedOperationException.class, (err) -> mappedException)); + + @Test + public void shouldReturnTrueIfFatal() { + assertTrue( + exceptionHandler.consumeIfFatal(new UnsupportedOperationException(), (err) -> {})); + } + + @Test + public void shouldReturnFalseIfNonFatal() { + assertFalse(exceptionHandler.consumeIfFatal(new IndexOutOfBoundsException(), (err) -> {})); + } + + @Test + public void shouldConsumeMappedExceptionIfFatal() { + Set<Exception> consumedExceptions = new HashSet<>(); + assertTrue( + exceptionHandler.consumeIfFatal( + new UnsupportedOperationException(), consumedExceptions::add)); + + assertEquals(1, consumedExceptions.size()); + assertTrue(consumedExceptions.contains(mappedException)); + } + + @Test + public void shouldNotConsumeMappedExceptionIfNonFatal() { + assertFalse( + exceptionHandler.consumeIfFatal( + new IndexOutOfBoundsException(), + // consumer should not be called + (err) -> assertTrue(false))); + } +} diff --git a/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/AWSFirehoseExceptionClassifiers.java b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/AWSFirehoseExceptionClassifiers.java new file mode 100644 index 0000000..80fe36d --- /dev/null +++ b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/AWSFirehoseExceptionClassifiers.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.firehose.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.aws.sink.throwable.AWSExceptionClassifierUtil; +import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier; + +import software.amazon.awssdk.services.firehose.model.FirehoseException; +import software.amazon.awssdk.services.firehose.model.ResourceNotFoundException; + +/** + * Class containing set of {@link FatalExceptionClassifier} for {@link + * software.amazon.awssdk.services.firehose.model.FirehoseException}. + */ +@Internal +public class AWSFirehoseExceptionClassifiers { + + public static FatalExceptionClassifier getNotAuthorizedExceptionClassifier() { + return AWSExceptionClassifierUtil.withAWSServiceErrorCode( + FirehoseException.class, + "NotAuthorized", + err -> + new KinesisFirehoseException( + "Encountered non-recoverable exception: NotAuthorized", err)); + } + + public static FatalExceptionClassifier getAccessDeniedExceptionClassifier() { + return AWSExceptionClassifierUtil.withAWSServiceErrorCode( + FirehoseException.class, + "AccessDeniedException", + err -> + new KinesisFirehoseException( + "Encountered non-recoverable exception: AccessDeniedException", + err)); + } + + public static FatalExceptionClassifier getResourceNotFoundExceptionClassifier() { + return FatalExceptionClassifier.withRootCauseOfType( + ResourceNotFoundException.class, + err -> + new KinesisFirehoseException( + "Encountered non-recoverable exception relating to not being able to find the specified resources", + err)); + } +} diff --git a/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java index e24c694..15cd799 100644 --- a/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java +++ b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java @@ -19,6 +19,7 @@ package org.apache.flink.connector.firehose.sink; import org.apache.flink.annotation.Internal; import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler; import org.apache.flink.connector.aws.util.AWSClientUtil; import org.apache.flink.connector.aws.util.AWSGeneralUtil; import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier; @@ -37,7 +38,6 @@ import software.amazon.awssdk.services.firehose.model.PutRecordBatchRequest; import software.amazon.awssdk.services.firehose.model.PutRecordBatchResponse; import software.amazon.awssdk.services.firehose.model.PutRecordBatchResponseEntry; import software.amazon.awssdk.services.firehose.model.Record; -import software.amazon.awssdk.services.firehose.model.ResourceNotFoundException; import java.util.ArrayList; import java.util.Collection; @@ -81,20 +81,16 @@ class KinesisFirehoseSinkWriter<InputT> extends AsyncSinkWriter<InputT, Record> KinesisFirehoseConfigConstants.FIREHOSE_CLIENT_USER_AGENT_PREFIX); } - private static final FatalExceptionClassifier RESOURCE_NOT_FOUND_EXCEPTION_CLASSIFIER = - FatalExceptionClassifier.withRootCauseOfType( - ResourceNotFoundException.class, - err -> - new KinesisFirehoseException( - "Encountered non-recoverable exception relating to not being able to find the specified resources", - err)); - - private static final FatalExceptionClassifier FIREHOSE_FATAL_EXCEPTION_CLASSIFIER = - FatalExceptionClassifier.createChain( - getInterruptedExceptionClassifier(), - getInvalidCredentialsExceptionClassifier(), - RESOURCE_NOT_FOUND_EXCEPTION_CLASSIFIER, - getSdkClientMisconfiguredExceptionClassifier()); + private static final AWSExceptionHandler FIREHOSE_EXCEPTION_HANDLER = + AWSExceptionHandler.withClassifier( + FatalExceptionClassifier.createChain( + getInterruptedExceptionClassifier(), + getInvalidCredentialsExceptionClassifier(), + AWSFirehoseExceptionClassifiers + .getResourceNotFoundExceptionClassifier(), + AWSFirehoseExceptionClassifiers.getAccessDeniedExceptionClassifier(), + AWSFirehoseExceptionClassifiers.getNotAuthorizedExceptionClassifier(), + getSdkClientMisconfiguredExceptionClassifier())); private final Counter numRecordsOutErrorsCounter; @@ -210,33 +206,42 @@ class KinesisFirehoseSinkWriter<InputT> extends AsyncSinkWriter<InputT, Record> private void handleFullyFailedRequest( Throwable err, List<Record> requestEntries, Consumer<List<Record>> requestResult) { - LOG.debug( + + numRecordsOutErrorsCounter.inc(requestEntries.size()); + boolean isFatal = FIREHOSE_EXCEPTION_HANDLER.consumeIfFatal(err, getFatalExceptionCons()); + if (isFatal) { + return; + } + + if (failOnError) { + getFatalExceptionCons() + .accept(new KinesisFirehoseException.KinesisFirehoseFailFastException(err)); + return; + } + + LOG.warn( "KDF Sink failed to write and will retry {} entries to KDF first request was {}", requestEntries.size(), requestEntries.get(0).toString(), err); - numRecordsOutErrorsCounter.inc(requestEntries.size()); - - if (isRetryable(err)) { - requestResult.accept(requestEntries); - } + requestResult.accept(requestEntries); } private void handlePartiallyFailedRequest( PutRecordBatchResponse response, List<Record> requestEntries, Consumer<List<Record>> requestResult) { - LOG.debug( - "KDF Sink failed to write and will retry {} entries to KDF first request was {}", - requestEntries.size(), - requestEntries.get(0).toString()); numRecordsOutErrorsCounter.inc(response.failedPutCount()); - if (failOnError) { getFatalExceptionCons() .accept(new KinesisFirehoseException.KinesisFirehoseFailFastException()); return; } + + LOG.debug( + "KDF Sink failed to write and will retry {} entries to KDF first request was {}", + requestEntries.size(), + requestEntries.get(0).toString()); List<Record> failedRequestEntries = new ArrayList<>(response.failedPutCount()); List<PutRecordBatchResponseEntry> records = response.requestResponses(); @@ -248,17 +253,4 @@ class KinesisFirehoseSinkWriter<InputT> extends AsyncSinkWriter<InputT, Record> requestResult.accept(failedRequestEntries); } - - private boolean isRetryable(Throwable err) { - if (!FIREHOSE_FATAL_EXCEPTION_CLASSIFIER.isFatal(err, getFatalExceptionCons())) { - return false; - } - if (failOnError) { - getFatalExceptionCons() - .accept(new KinesisFirehoseException.KinesisFirehoseFailFastException(err)); - return false; - } - - return true; - } } diff --git a/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/AWSFirehoseExceptionClassifiersTest.java b/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/AWSFirehoseExceptionClassifiersTest.java new file mode 100644 index 0000000..791021e --- /dev/null +++ b/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/AWSFirehoseExceptionClassifiersTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.firehose.sink; + +import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier; + +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.awscore.exception.AwsErrorDetails; +import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.services.firehose.model.FirehoseException; +import software.amazon.awssdk.services.firehose.model.ResourceNotFoundException; + +import static org.junit.jupiter.api.Assertions.assertFalse; + +/** Unit tests for {@link AWSFirehoseExceptionClassifiers}. */ +public class AWSFirehoseExceptionClassifiersTest { + + private final FatalExceptionClassifier classifier = + FatalExceptionClassifier.createChain( + AWSFirehoseExceptionClassifiers.getAccessDeniedExceptionClassifier(), + AWSFirehoseExceptionClassifiers.getNotAuthorizedExceptionClassifier(), + AWSFirehoseExceptionClassifiers.getResourceNotFoundExceptionClassifier()); + + @Test + public void shouldClassifyNotAuthorizedAsFatal() { + AwsServiceException firehoseException = + FirehoseException.builder() + .awsErrorDetails( + AwsErrorDetails.builder().errorCode("NotAuthorized").build()) + .build(); + + // isFatal returns `true` if an exception is non-fatal + assertFalse(classifier.isFatal(firehoseException, ex -> {})); + } + + @Test + public void shouldClassifyAccessDeniedExceptionAsFatal() { + AwsServiceException firehoseException = + FirehoseException.builder() + .awsErrorDetails( + AwsErrorDetails.builder() + .errorCode("AccessDeniedException") + .build()) + .build(); + + // isFatal returns `true` if an exception is non-fatal + assertFalse(classifier.isFatal(firehoseException, ex -> {})); + } + + @Test + public void shouldClassifyResourceNotFoundAsFatal() { + AwsServiceException firehoseException = ResourceNotFoundException.builder().build(); + + // isFatal returns `true` if an exception is non-fatal + assertFalse(classifier.isFatal(firehoseException, ex -> {})); + } +}