dannycranmer commented on a change in pull request #18553:
URL: https://github.com/apache/flink/pull/18553#discussion_r796596506



##########
File path: 
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java
##########
@@ -56,6 +62,61 @@
 class KinesisDataStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, 
PutRecordsRequestEntry> {
     private static final Logger LOG = 
LoggerFactory.getLogger(KinesisDataStreamsSinkWriter.class);
 
+    private static final RetryValidationStrategy INVALID_CREDENTIALS_STRATEGY =
+            new RetryValidationStrategy(
+                    err -> ExceptionUtils.findThrowable(err, 
StsException.class).isPresent(),

Review comment:
       nit: This is much better, but still duplicating logic. You could sub 
`RetryValidationStrategy` with a `RootCauseRetryValidationStrategy` that wraps 
up the `ExceptionUtils.findThrowable(` and you simply pass the class to look for

##########
File path: 
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java
##########
@@ -56,6 +62,61 @@
 class KinesisDataStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, 
PutRecordsRequestEntry> {
     private static final Logger LOG = 
LoggerFactory.getLogger(KinesisDataStreamsSinkWriter.class);
 
+    private static final RetryValidationStrategy INVALID_CREDENTIALS_STRATEGY =
+            new RetryValidationStrategy(
+                    err -> ExceptionUtils.findThrowable(err, 
StsException.class).isPresent(),
+                    err ->
+                            new KinesisDataStreamsException(
+                                    "Encountered non-recoverable exception 
relating to the provided credentials.",
+                                    err));
+    private static final RetryValidationStrategy RESOURCE_NOT_FOUND_STRATEGY =
+            new RetryValidationStrategy(
+                    err ->
+                            ExceptionUtils.findThrowable(err, 
ResourceNotFoundException.class)
+                                    .isPresent(),
+                    err ->
+                            new KinesisDataStreamsException(
+                                    "Encountered non-recoverable exception 
relating to not being able to find the specified resources",
+                                    err));
+    private static final RetryValidationStrategy 
SDK_CLIENT_MISCONFIGURED_STRATEGY =
+            new RetryValidationStrategy(
+                    err -> ExceptionUtils.findThrowable(err, 
SdkClientException.class).isPresent(),
+                    err ->
+                            new KinesisDataStreamsException(
+                                    "Encountered non-recoverable exception 
relating to mis-configured client",
+                                    err));
+    private static final RetryValidationStrategy 
MISSING_ACCESS_KEY_ID_STRATEGY =
+            new RetryValidationStrategy(
+                    err ->
+                            ExceptionUtils.findThrowableWithMessage(
+                                                    err, "Access key ID cannot 
be blank.")
+                                            .isPresent()
+                                    || ExceptionUtils.findThrowableWithMessage(
+                                                    err,
+                                                    "Either the environment 
variable AWS_WEB_IDENTITY_TOKEN_FILE or the javaproperty 
aws.webIdentityTokenFile must be set.")
+                                            .isPresent(),

Review comment:
       Looks for messages is a bit fragile. Updating the AWS SDK could break 
this, and I doubt we would catch it in the test. Is there another way we can 
classify this?

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkRetryValidationStrategies.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+
+/** Common retry strategies needed to fail fast on common errors. */
+@Internal
+public class AsyncSinkRetryValidationStrategies {
+    public static final RetryValidationStrategy INTERRUPTED_STRATEGY =
+            new RetryValidationStrategy(
+                    err ->
+                            ExceptionUtils.findThrowable(err, 
InterruptedException.class)
+                                    .isPresent(),
+                    err -> new FlinkException("Running job was cancelled"));

Review comment:
       > Running job was cancelled
   
   Are we confident enough to make this claim here? I do not think a Validation 
Strategy should know about Flink jobs. We can just more generally say "Thread 
was interrupted".

##########
File path: 
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java
##########
@@ -56,6 +62,61 @@
 class KinesisDataStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, 
PutRecordsRequestEntry> {
     private static final Logger LOG = 
LoggerFactory.getLogger(KinesisDataStreamsSinkWriter.class);
 
+    private static final RetryValidationStrategy INVALID_CREDENTIALS_STRATEGY =
+            new RetryValidationStrategy(
+                    err -> ExceptionUtils.findThrowable(err, 
StsException.class).isPresent(),
+                    err ->
+                            new KinesisDataStreamsException(
+                                    "Encountered non-recoverable exception 
relating to the provided credentials.",
+                                    err));
+    private static final RetryValidationStrategy RESOURCE_NOT_FOUND_STRATEGY =
+            new RetryValidationStrategy(
+                    err ->
+                            ExceptionUtils.findThrowable(err, 
ResourceNotFoundException.class)
+                                    .isPresent(),
+                    err ->
+                            new KinesisDataStreamsException(
+                                    "Encountered non-recoverable exception 
relating to not being able to find the specified resources",
+                                    err));
+    private static final RetryValidationStrategy 
SDK_CLIENT_MISCONFIGURED_STRATEGY =
+            new RetryValidationStrategy(
+                    err -> ExceptionUtils.findThrowable(err, 
SdkClientException.class).isPresent(),
+                    err ->
+                            new KinesisDataStreamsException(
+                                    "Encountered non-recoverable exception 
relating to mis-configured client",
+                                    err));
+    private static final RetryValidationStrategy 
MISSING_ACCESS_KEY_ID_STRATEGY =
+            new RetryValidationStrategy(
+                    err ->
+                            ExceptionUtils.findThrowableWithMessage(
+                                                    err, "Access key ID cannot 
be blank.")
+                                            .isPresent()
+                                    || ExceptionUtils.findThrowableWithMessage(
+                                                    err,
+                                                    "Either the environment 
variable AWS_WEB_IDENTITY_TOKEN_FILE or the javaproperty 
aws.webIdentityTokenFile must be set.")
+                                            .isPresent(),
+                    err ->
+                            new KinesisDataStreamsException(
+                                    "Encountered non-recoverable exception 
relating to missing credentials",
+                                    err));
+    private static final RetryValidationStrategy 
NON_RECOVERABLE_EXCEPTION_STRATEGY =
+            new RetryValidationStrategy(
+                    err -> ExceptionUtils.findThrowable(err, 
Error.class).isPresent(),
+                    err ->
+                            new KinesisDataStreamsException(
+                                    "Encountered non-recoverable exception in 
the Kinesis Data Streams Sink",
+                                    err));

Review comment:
       Same as above, can this pull pulled to module it can be reused from?

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/RetryValidationStrategy.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+/** Class acting as a classifier for exception by failed requests to be 
retried by sink. */
+@Internal
+public class RetryValidationStrategy {

Review comment:
       The word `Validation` in this class is confusing. Seems like it is a 
strategy determining when to retry validation. Suggest we make it more general 
`RetryStrategy` or maybe `ExceptionClassifier` but make sure there are no 
clashes with existing classes. This class is in the `sink` package but it looks 
more general than sink

##########
File path: 
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java
##########
@@ -56,6 +62,61 @@
 class KinesisDataStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, 
PutRecordsRequestEntry> {
     private static final Logger LOG = 
LoggerFactory.getLogger(KinesisDataStreamsSinkWriter.class);
 
+    private static final RetryValidationStrategy INVALID_CREDENTIALS_STRATEGY =
+            new RetryValidationStrategy(
+                    err -> ExceptionUtils.findThrowable(err, 
StsException.class).isPresent(),
+                    err ->
+                            new KinesisDataStreamsException(
+                                    "Encountered non-recoverable exception 
relating to the provided credentials.",
+                                    err));
+    private static final RetryValidationStrategy RESOURCE_NOT_FOUND_STRATEGY =
+            new RetryValidationStrategy(
+                    err ->
+                            ExceptionUtils.findThrowable(err, 
ResourceNotFoundException.class)
+                                    .isPresent(),
+                    err ->
+                            new KinesisDataStreamsException(
+                                    "Encountered non-recoverable exception 
relating to not being able to find the specified resources",
+                                    err));
+    private static final RetryValidationStrategy 
SDK_CLIENT_MISCONFIGURED_STRATEGY =
+            new RetryValidationStrategy(
+                    err -> ExceptionUtils.findThrowable(err, 
SdkClientException.class).isPresent(),
+                    err ->
+                            new KinesisDataStreamsException(
+                                    "Encountered non-recoverable exception 
relating to mis-configured client",
+                                    err));

Review comment:
       This seems like it should be a class, that can be reused by Firehose 
sink, and other AWS sinks

##########
File path: 
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/RetryValidationStrategyTest.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/** Tests the RetryValidationStrategy of the Async Sink Writer. */
+public class RetryValidationStrategyTest {
+
+    private static Integer nullReference;
+
+    private static final RetryValidationStrategy ARITHMETIC_EXCEPTION_STRATEGY 
=
+            new RetryValidationStrategy(
+                    err -> ExceptionUtils.findThrowable(err, 
ArithmeticException.class).isPresent(),
+                    err ->
+                            new RuntimeException(
+                                    "Buffer manipulation calculations resulted 
in a calculation exception",
+                                    err));
+
+    private static final RetryValidationStrategy 
NULL_POINTER_EXCEPTION_STRATEGY =
+            new RetryValidationStrategy(
+                    err ->
+                            ExceptionUtils.findThrowable(err, 
NullPointerException.class)
+                                    .isPresent(),
+                    err ->
+                            new RuntimeException(
+                                    "Buffer manipulation calculations resulted 
in a reference exception",
+                                    err));
+
+    @Test
+    public void 
exceptionsAreWrappedInTheContainingExceptionWhenAMatchIsFound() {
+        AtomicReference<Exception> caughtExceptionReference = new 
AtomicReference<>();
+        try {
+            int failedCalculation = 100 / 0;
+        } catch (Exception e) {
+            ARITHMETIC_EXCEPTION_STRATEGY.shouldRetry(e, 
caughtExceptionReference::set);

Review comment:
       nit: Why not just instantiate and pass the exception in?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to