This is an automated email from the ASF dual-hosted git repository. dannycranmer pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 9412c99 [FLINK-25811][connector/base] changing failed requests handler to accept list in AsyncSinkWriter 9412c99 is described below commit 9412c992e50004eb2412530a3b1c0a288fc5fc64 Author: Ahmed Hamdy <vah...@amazon.com> AuthorDate: Tue Jan 25 15:34:52 2022 +0000 [FLINK-25811][connector/base] changing failed requests handler to accept list in AsyncSinkWriter --- .../kinesis/sink/KinesisDataStreamsSinkWriter.java | 7 +++---- .../connector/base/sink/writer/AsyncSinkWriter.java | 16 +++++++++------- .../flink/connector/base/sink/ArrayListAsyncSink.java | 2 +- .../connector/base/sink/writer/AsyncSinkWriterTest.java | 5 ++--- 4 files changed, 15 insertions(+), 15 deletions(-) diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java index 984998f..b720e5e 100644 --- a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java +++ b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java @@ -36,7 +36,6 @@ import software.amazon.awssdk.services.kinesis.model.PutRecordsResultEntry; import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Properties; @@ -112,7 +111,7 @@ class KinesisDataStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRe @Override protected void submitRequestEntries( List<PutRecordsRequestEntry> requestEntries, - Consumer<Collection<PutRecordsRequestEntry>> requestResult) { + Consumer<List<PutRecordsRequestEntry>> requestResult) { PutRecordsRequest batchRequest = PutRecordsRequest.builder().records(requestEntries).streamName(streamName).build(); @@ -141,7 +140,7 @@ class KinesisDataStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRe private void handleFullyFailedRequest( Throwable err, List<PutRecordsRequestEntry> requestEntries, - Consumer<Collection<PutRecordsRequestEntry>> requestResult) { + Consumer<List<PutRecordsRequestEntry>> requestResult) { LOG.warn("KDS Sink failed to persist {} entries to KDS", requestEntries.size(), err); numRecordsOutErrorsCounter.inc(requestEntries.size()); @@ -153,7 +152,7 @@ class KinesisDataStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRe private void handlePartiallyFailedRequest( PutRecordsResponse response, List<PutRecordsRequestEntry> requestEntries, - Consumer<Collection<PutRecordsRequestEntry>> requestResult) { + Consumer<List<PutRecordsRequestEntry>> requestResult) { LOG.warn("KDS Sink failed to persist {} entries to KDS", response.failedRecordCount()); numRecordsOutErrorsCounter.inc(response.failedRecordCount()); diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java index fa158fe..263164e 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java @@ -34,6 +34,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Deque; import java.util.List; +import java.util.ListIterator; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -186,7 +187,7 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable * method. All other elements are assumed to have been successfully persisted. */ protected abstract void submitRequestEntries( - List<RequestEntryT> requestEntries, Consumer<Collection<RequestEntryT>> requestResult); + List<RequestEntryT> requestEntries, Consumer<List<RequestEntryT>> requestResult); /** * This method allows the getting of the size of a {@code RequestEntryT} in bytes. The size in @@ -300,7 +301,7 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable } long timestampOfRequest = System.currentTimeMillis(); - Consumer<Collection<RequestEntryT>> requestResult = + Consumer<List<RequestEntryT>> requestResult = failedRequestEntries -> mailboxExecutor.execute( () -> completeRequest(failedRequestEntries, timestampOfRequest), @@ -343,14 +344,15 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable * * @param failedRequestEntries requestEntries that need to be retried */ - private void completeRequest( - Collection<RequestEntryT> failedRequestEntries, long requestStartTime) { + private void completeRequest(List<RequestEntryT> failedRequestEntries, long requestStartTime) { lastSendTimestamp = requestStartTime; ackTime = System.currentTimeMillis(); inFlightRequestsCount--; - List<RequestEntryT> requestsList = new ArrayList<>(failedRequestEntries); - Collections.reverse(requestsList); - requestsList.forEach(failedEntry -> addEntryToBuffer(failedEntry, true)); + ListIterator<RequestEntryT> iterator = + failedRequestEntries.listIterator(failedRequestEntries.size()); + while (iterator.hasPrevious()) { + addEntryToBuffer(iterator.previous(), true); + } } private void addEntryToBuffer(RequestEntryT entry, boolean insertAtHead) { diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/ArrayListAsyncSink.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/ArrayListAsyncSink.java index dc48db8..946e976 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/ArrayListAsyncSink.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/ArrayListAsyncSink.java @@ -69,7 +69,7 @@ public class ArrayListAsyncSink extends AsyncSinkBase<String, Integer> { @Override protected void submitRequestEntries( - List<Integer> requestEntries, Consumer<Collection<Integer>> requestResult) { + List<Integer> requestEntries, Consumer<List<Integer>> requestResult) { try { ArrayListDestination.putRecords(requestEntries); } catch (RuntimeException e) { diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java index 4352305..b508159 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java @@ -40,7 +40,6 @@ import org.junit.Test; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Optional; @@ -886,7 +885,7 @@ public class AsyncSinkWriterTest { */ @Override protected void submitRequestEntries( - List<Integer> requestEntries, Consumer<Collection<Integer>> requestResult) { + List<Integer> requestEntries, Consumer<List<Integer>> requestResult) { maybeDelay(); if (requestEntries.stream().anyMatch(val -> val > 100 && val <= 200)) { @@ -1146,7 +1145,7 @@ public class AsyncSinkWriterTest { @Override protected void submitRequestEntries( - List<Integer> requestEntries, Consumer<Collection<Integer>> requestResult) { + List<Integer> requestEntries, Consumer<List<Integer>> requestResult) { if (requestEntries.size() == 3) { try { delayedStartLatch.countDown();