dannycranmer commented on a change in pull request #18553: URL: https://github.com/apache/flink/pull/18553#discussion_r796596506
########## File path: flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java ########## @@ -56,6 +62,61 @@ class KinesisDataStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRecordsRequestEntry> { private static final Logger LOG = LoggerFactory.getLogger(KinesisDataStreamsSinkWriter.class); + private static final RetryValidationStrategy INVALID_CREDENTIALS_STRATEGY = + new RetryValidationStrategy( + err -> ExceptionUtils.findThrowable(err, StsException.class).isPresent(), Review comment: nit: This is much better, but still duplicating logic. You could sub `RetryValidationStrategy` with a `RootCauseRetryValidationStrategy` that wraps up the `ExceptionUtils.findThrowable(` and you simply pass the class to look for ########## File path: flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java ########## @@ -56,6 +62,61 @@ class KinesisDataStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRecordsRequestEntry> { private static final Logger LOG = LoggerFactory.getLogger(KinesisDataStreamsSinkWriter.class); + private static final RetryValidationStrategy INVALID_CREDENTIALS_STRATEGY = + new RetryValidationStrategy( + err -> ExceptionUtils.findThrowable(err, StsException.class).isPresent(), + err -> + new KinesisDataStreamsException( + "Encountered non-recoverable exception relating to the provided credentials.", + err)); + private static final RetryValidationStrategy RESOURCE_NOT_FOUND_STRATEGY = + new RetryValidationStrategy( + err -> + ExceptionUtils.findThrowable(err, ResourceNotFoundException.class) + .isPresent(), + err -> + new KinesisDataStreamsException( + "Encountered non-recoverable exception relating to not being able to find the specified resources", + err)); + private static final RetryValidationStrategy SDK_CLIENT_MISCONFIGURED_STRATEGY = + new RetryValidationStrategy( + err -> ExceptionUtils.findThrowable(err, SdkClientException.class).isPresent(), + err -> + new KinesisDataStreamsException( + "Encountered non-recoverable exception relating to mis-configured client", + err)); + private static final RetryValidationStrategy MISSING_ACCESS_KEY_ID_STRATEGY = + new RetryValidationStrategy( + err -> + ExceptionUtils.findThrowableWithMessage( + err, "Access key ID cannot be blank.") + .isPresent() + || ExceptionUtils.findThrowableWithMessage( + err, + "Either the environment variable AWS_WEB_IDENTITY_TOKEN_FILE or the javaproperty aws.webIdentityTokenFile must be set.") + .isPresent(), Review comment: Looks for messages is a bit fragile. Updating the AWS SDK could break this, and I doubt we would catch it in the test. Is there another way we can classify this? ########## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkRetryValidationStrategies.java ########## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.sink.writer; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; + +/** Common retry strategies needed to fail fast on common errors. */ +@Internal +public class AsyncSinkRetryValidationStrategies { + public static final RetryValidationStrategy INTERRUPTED_STRATEGY = + new RetryValidationStrategy( + err -> + ExceptionUtils.findThrowable(err, InterruptedException.class) + .isPresent(), + err -> new FlinkException("Running job was cancelled")); Review comment: > Running job was cancelled Are we confident enough to make this claim here? I do not think a Validation Strategy should know about Flink jobs. We can just more generally say "Thread was interrupted". ########## File path: flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java ########## @@ -56,6 +62,61 @@ class KinesisDataStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRecordsRequestEntry> { private static final Logger LOG = LoggerFactory.getLogger(KinesisDataStreamsSinkWriter.class); + private static final RetryValidationStrategy INVALID_CREDENTIALS_STRATEGY = + new RetryValidationStrategy( + err -> ExceptionUtils.findThrowable(err, StsException.class).isPresent(), + err -> + new KinesisDataStreamsException( + "Encountered non-recoverable exception relating to the provided credentials.", + err)); + private static final RetryValidationStrategy RESOURCE_NOT_FOUND_STRATEGY = + new RetryValidationStrategy( + err -> + ExceptionUtils.findThrowable(err, ResourceNotFoundException.class) + .isPresent(), + err -> + new KinesisDataStreamsException( + "Encountered non-recoverable exception relating to not being able to find the specified resources", + err)); + private static final RetryValidationStrategy SDK_CLIENT_MISCONFIGURED_STRATEGY = + new RetryValidationStrategy( + err -> ExceptionUtils.findThrowable(err, SdkClientException.class).isPresent(), + err -> + new KinesisDataStreamsException( + "Encountered non-recoverable exception relating to mis-configured client", + err)); + private static final RetryValidationStrategy MISSING_ACCESS_KEY_ID_STRATEGY = + new RetryValidationStrategy( + err -> + ExceptionUtils.findThrowableWithMessage( + err, "Access key ID cannot be blank.") + .isPresent() + || ExceptionUtils.findThrowableWithMessage( + err, + "Either the environment variable AWS_WEB_IDENTITY_TOKEN_FILE or the javaproperty aws.webIdentityTokenFile must be set.") + .isPresent(), + err -> + new KinesisDataStreamsException( + "Encountered non-recoverable exception relating to missing credentials", + err)); + private static final RetryValidationStrategy NON_RECOVERABLE_EXCEPTION_STRATEGY = + new RetryValidationStrategy( + err -> ExceptionUtils.findThrowable(err, Error.class).isPresent(), + err -> + new KinesisDataStreamsException( + "Encountered non-recoverable exception in the Kinesis Data Streams Sink", + err)); Review comment: Same as above, can this pull pulled to module it can be reused from? ########## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/RetryValidationStrategy.java ########## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.sink.writer; + +import org.apache.flink.annotation.Internal; + +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Predicate; + +/** Class acting as a classifier for exception by failed requests to be retried by sink. */ +@Internal +public class RetryValidationStrategy { Review comment: The word `Validation` in this class is confusing. Seems like it is a strategy determining when to retry validation. Suggest we make it more general `RetryStrategy` or maybe `ExceptionClassifier` but make sure there are no clashes with existing classes. This class is in the `sink` package but it looks more general than sink ########## File path: flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java ########## @@ -56,6 +62,61 @@ class KinesisDataStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRecordsRequestEntry> { private static final Logger LOG = LoggerFactory.getLogger(KinesisDataStreamsSinkWriter.class); + private static final RetryValidationStrategy INVALID_CREDENTIALS_STRATEGY = + new RetryValidationStrategy( + err -> ExceptionUtils.findThrowable(err, StsException.class).isPresent(), + err -> + new KinesisDataStreamsException( + "Encountered non-recoverable exception relating to the provided credentials.", + err)); + private static final RetryValidationStrategy RESOURCE_NOT_FOUND_STRATEGY = + new RetryValidationStrategy( + err -> + ExceptionUtils.findThrowable(err, ResourceNotFoundException.class) + .isPresent(), + err -> + new KinesisDataStreamsException( + "Encountered non-recoverable exception relating to not being able to find the specified resources", + err)); + private static final RetryValidationStrategy SDK_CLIENT_MISCONFIGURED_STRATEGY = + new RetryValidationStrategy( + err -> ExceptionUtils.findThrowable(err, SdkClientException.class).isPresent(), + err -> + new KinesisDataStreamsException( + "Encountered non-recoverable exception relating to mis-configured client", + err)); Review comment: This seems like it should be a class, that can be reused by Firehose sink, and other AWS sinks ########## File path: flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/RetryValidationStrategyTest.java ########## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.sink.writer; + +import org.apache.flink.util.ExceptionUtils; + +import org.junit.jupiter.api.Test; + +import java.util.concurrent.atomic.AtomicReference; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +/** Tests the RetryValidationStrategy of the Async Sink Writer. */ +public class RetryValidationStrategyTest { + + private static Integer nullReference; + + private static final RetryValidationStrategy ARITHMETIC_EXCEPTION_STRATEGY = + new RetryValidationStrategy( + err -> ExceptionUtils.findThrowable(err, ArithmeticException.class).isPresent(), + err -> + new RuntimeException( + "Buffer manipulation calculations resulted in a calculation exception", + err)); + + private static final RetryValidationStrategy NULL_POINTER_EXCEPTION_STRATEGY = + new RetryValidationStrategy( + err -> + ExceptionUtils.findThrowable(err, NullPointerException.class) + .isPresent(), + err -> + new RuntimeException( + "Buffer manipulation calculations resulted in a reference exception", + err)); + + @Test + public void exceptionsAreWrappedInTheContainingExceptionWhenAMatchIsFound() { + AtomicReference<Exception> caughtExceptionReference = new AtomicReference<>(); + try { + int failedCalculation = 100 / 0; + } catch (Exception e) { + ARITHMETIC_EXCEPTION_STRATEGY.shouldRetry(e, caughtExceptionReference::set); Review comment: nit: Why not just instantiate and pass the exception in? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org