This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 9412c99  [FLINK-25811][connector/base] changing failed requests 
handler to accept list in AsyncSinkWriter
9412c99 is described below

commit 9412c992e50004eb2412530a3b1c0a288fc5fc64
Author: Ahmed Hamdy <vah...@amazon.com>
AuthorDate: Tue Jan 25 15:34:52 2022 +0000

    [FLINK-25811][connector/base] changing failed requests handler to accept 
list in AsyncSinkWriter
---
 .../kinesis/sink/KinesisDataStreamsSinkWriter.java       |  7 +++----
 .../connector/base/sink/writer/AsyncSinkWriter.java      | 16 +++++++++-------
 .../flink/connector/base/sink/ArrayListAsyncSink.java    |  2 +-
 .../connector/base/sink/writer/AsyncSinkWriterTest.java  |  5 ++---
 4 files changed, 15 insertions(+), 15 deletions(-)

diff --git 
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java
 
b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java
index 984998f..b720e5e 100644
--- 
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java
+++ 
b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java
@@ -36,7 +36,6 @@ import 
software.amazon.awssdk.services.kinesis.model.PutRecordsResultEntry;
 import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
@@ -112,7 +111,7 @@ class KinesisDataStreamsSinkWriter<InputT> extends 
AsyncSinkWriter<InputT, PutRe
     @Override
     protected void submitRequestEntries(
             List<PutRecordsRequestEntry> requestEntries,
-            Consumer<Collection<PutRecordsRequestEntry>> requestResult) {
+            Consumer<List<PutRecordsRequestEntry>> requestResult) {
 
         PutRecordsRequest batchRequest =
                 
PutRecordsRequest.builder().records(requestEntries).streamName(streamName).build();
@@ -141,7 +140,7 @@ class KinesisDataStreamsSinkWriter<InputT> extends 
AsyncSinkWriter<InputT, PutRe
     private void handleFullyFailedRequest(
             Throwable err,
             List<PutRecordsRequestEntry> requestEntries,
-            Consumer<Collection<PutRecordsRequestEntry>> requestResult) {
+            Consumer<List<PutRecordsRequestEntry>> requestResult) {
         LOG.warn("KDS Sink failed to persist {} entries to KDS", 
requestEntries.size(), err);
         numRecordsOutErrorsCounter.inc(requestEntries.size());
 
@@ -153,7 +152,7 @@ class KinesisDataStreamsSinkWriter<InputT> extends 
AsyncSinkWriter<InputT, PutRe
     private void handlePartiallyFailedRequest(
             PutRecordsResponse response,
             List<PutRecordsRequestEntry> requestEntries,
-            Consumer<Collection<PutRecordsRequestEntry>> requestResult) {
+            Consumer<List<PutRecordsRequestEntry>> requestResult) {
         LOG.warn("KDS Sink failed to persist {} entries to KDS", 
response.failedRecordCount());
         numRecordsOutErrorsCounter.inc(response.failedRecordCount());
 
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
index fa158fe..263164e 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
@@ -34,6 +34,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.List;
+import java.util.ListIterator;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
@@ -186,7 +187,7 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT 
extends Serializable
      *     method. All other elements are assumed to have been successfully 
persisted.
      */
     protected abstract void submitRequestEntries(
-            List<RequestEntryT> requestEntries, 
Consumer<Collection<RequestEntryT>> requestResult);
+            List<RequestEntryT> requestEntries, Consumer<List<RequestEntryT>> 
requestResult);
 
     /**
      * This method allows the getting of the size of a {@code RequestEntryT} 
in bytes. The size in
@@ -300,7 +301,7 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT 
extends Serializable
         }
 
         long timestampOfRequest = System.currentTimeMillis();
-        Consumer<Collection<RequestEntryT>> requestResult =
+        Consumer<List<RequestEntryT>> requestResult =
                 failedRequestEntries ->
                         mailboxExecutor.execute(
                                 () -> completeRequest(failedRequestEntries, 
timestampOfRequest),
@@ -343,14 +344,15 @@ public abstract class AsyncSinkWriter<InputT, 
RequestEntryT extends Serializable
      *
      * @param failedRequestEntries requestEntries that need to be retried
      */
-    private void completeRequest(
-            Collection<RequestEntryT> failedRequestEntries, long 
requestStartTime) {
+    private void completeRequest(List<RequestEntryT> failedRequestEntries, 
long requestStartTime) {
         lastSendTimestamp = requestStartTime;
         ackTime = System.currentTimeMillis();
         inFlightRequestsCount--;
-        List<RequestEntryT> requestsList = new 
ArrayList<>(failedRequestEntries);
-        Collections.reverse(requestsList);
-        requestsList.forEach(failedEntry -> addEntryToBuffer(failedEntry, 
true));
+        ListIterator<RequestEntryT> iterator =
+                failedRequestEntries.listIterator(failedRequestEntries.size());
+        while (iterator.hasPrevious()) {
+            addEntryToBuffer(iterator.previous(), true);
+        }
     }
 
     private void addEntryToBuffer(RequestEntryT entry, boolean insertAtHead) {
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/ArrayListAsyncSink.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/ArrayListAsyncSink.java
index dc48db8..946e976 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/ArrayListAsyncSink.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/ArrayListAsyncSink.java
@@ -69,7 +69,7 @@ public class ArrayListAsyncSink extends AsyncSinkBase<String, 
Integer> {
 
             @Override
             protected void submitRequestEntries(
-                    List<Integer> requestEntries, 
Consumer<Collection<Integer>> requestResult) {
+                    List<Integer> requestEntries, Consumer<List<Integer>> 
requestResult) {
                 try {
                     ArrayListDestination.putRecords(requestEntries);
                 } catch (RuntimeException e) {
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
index 4352305..b508159 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
@@ -40,7 +40,6 @@ import org.junit.Test;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
@@ -886,7 +885,7 @@ public class AsyncSinkWriterTest {
          */
         @Override
         protected void submitRequestEntries(
-                List<Integer> requestEntries, Consumer<Collection<Integer>> 
requestResult) {
+                List<Integer> requestEntries, Consumer<List<Integer>> 
requestResult) {
             maybeDelay();
 
             if (requestEntries.stream().anyMatch(val -> val > 100 && val <= 
200)) {
@@ -1146,7 +1145,7 @@ public class AsyncSinkWriterTest {
 
         @Override
         protected void submitRequestEntries(
-                List<Integer> requestEntries, Consumer<Collection<Integer>> 
requestResult) {
+                List<Integer> requestEntries, Consumer<List<Integer>> 
requestResult) {
             if (requestEntries.size() == 3) {
                 try {
                     delayedStartLatch.countDown();

Reply via email to