dannycranmer commented on code in PR #24839: URL: https://github.com/apache/flink/pull/24839#discussion_r1613883830
########## flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/AsyncSinkBase.java: ########## @@ -54,6 +54,8 @@ public abstract class AsyncSinkBase<InputT, RequestEntryT extends Serializable> private final long maxBatchSizeInBytes; private final long maxTimeInBufferMS; private final long maxRecordSizeInBytes; + private Long requestTimeoutMS; + private Boolean failOnTimeout; Review Comment: Why `Boolean` and not `boolean` ? ########## flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/AsyncSinkBase.java: ########## @@ -54,6 +54,8 @@ public abstract class AsyncSinkBase<InputT, RequestEntryT extends Serializable> private final long maxBatchSizeInBytes; private final long maxTimeInBufferMS; private final long maxRecordSizeInBytes; + private Long requestTimeoutMS; + private Boolean failOnTimeout; Review Comment: Also, to avoid null checks could default request timeout ms to -1 for off ########## flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/AsyncSinkBase.java: ########## @@ -54,6 +54,8 @@ public abstract class AsyncSinkBase<InputT, RequestEntryT extends Serializable> private final long maxBatchSizeInBytes; private final long maxTimeInBufferMS; private final long maxRecordSizeInBytes; + private Long requestTimeoutMS; + private Boolean failOnTimeout; Review Comment: Make `final` ########## flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java: ########## @@ -181,15 +187,88 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable * <p>During checkpointing, the sink needs to ensure that there are no outstanding in-flight * requests. * + * <p>This method is deprecated in favor of {@code submitRequestEntries( List<RequestEntryT> + * requestEntries, ResultHandler<RequestEntryT> resultHandler)} + * * @param requestEntries a set of request entries that should be sent to the destination * @param requestToRetry the {@code accept} method should be called on this Consumer once the * processing of the {@code requestEntries} are complete. Any entries that encountered * difficulties in persisting should be re-queued through {@code requestToRetry} by * including that element in the collection of {@code RequestEntryT}s passed to the {@code * accept} method. All other elements are assumed to have been successfully persisted. */ - protected abstract void submitRequestEntries( - List<RequestEntryT> requestEntries, Consumer<List<RequestEntryT>> requestToRetry); + @Deprecated + protected void submitRequestEntries( + List<RequestEntryT> requestEntries, Consumer<List<RequestEntryT>> requestToRetry) { + throw new UnsupportedOperationException( + "This method is deprecated. Please override the method that accepts a ResultHandler."); + } + + /** + * This method specifies how to persist buffered request entries into the destination. It is + * implemented when support for a new destination is added. + * + * <p>The method is invoked with a set of request entries according to the buffering hints (and + * the valid limits of the destination). The logic then needs to create and execute the request + * asynchronously against the destination (ideally by batching together multiple request entries + * to increase efficiency). The logic also needs to identify individual request entries that + * were not persisted successfully and resubmit them using the {@code requestToRetry} callback. + * + * <p>From a threading perspective, the mailbox thread will call this method and initiate the + * asynchronous request to persist the {@code requestEntries}. NOTE: The client must support Review Comment: Well... You could spin up a thread pool in the sink, and not necessarily in the client -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org