dannycranmer commented on code in PR #24839:
URL: https://github.com/apache/flink/pull/24839#discussion_r1613883830


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/AsyncSinkBase.java:
##########
@@ -54,6 +54,8 @@ public abstract class AsyncSinkBase<InputT, RequestEntryT 
extends Serializable>
     private final long maxBatchSizeInBytes;
     private final long maxTimeInBufferMS;
     private final long maxRecordSizeInBytes;
+    private Long requestTimeoutMS;
+    private Boolean failOnTimeout;

Review Comment:
   Why `Boolean` and not `boolean` ?



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/AsyncSinkBase.java:
##########
@@ -54,6 +54,8 @@ public abstract class AsyncSinkBase<InputT, RequestEntryT 
extends Serializable>
     private final long maxBatchSizeInBytes;
     private final long maxTimeInBufferMS;
     private final long maxRecordSizeInBytes;
+    private Long requestTimeoutMS;
+    private Boolean failOnTimeout;

Review Comment:
   Also, to avoid null checks could default request timeout ms to -1 for off



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/AsyncSinkBase.java:
##########
@@ -54,6 +54,8 @@ public abstract class AsyncSinkBase<InputT, RequestEntryT 
extends Serializable>
     private final long maxBatchSizeInBytes;
     private final long maxTimeInBufferMS;
     private final long maxRecordSizeInBytes;
+    private Long requestTimeoutMS;
+    private Boolean failOnTimeout;

Review Comment:
   Make `final`



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -181,15 +187,88 @@ public abstract class AsyncSinkWriter<InputT, 
RequestEntryT extends Serializable
      * <p>During checkpointing, the sink needs to ensure that there are no 
outstanding in-flight
      * requests.
      *
+     * <p>This method is deprecated in favor of {@code submitRequestEntries( 
List<RequestEntryT>
+     * requestEntries, ResultHandler<RequestEntryT> resultHandler)}
+     *
      * @param requestEntries a set of request entries that should be sent to 
the destination
      * @param requestToRetry the {@code accept} method should be called on 
this Consumer once the
      *     processing of the {@code requestEntries} are complete. Any entries 
that encountered
      *     difficulties in persisting should be re-queued through {@code 
requestToRetry} by
      *     including that element in the collection of {@code RequestEntryT}s 
passed to the {@code
      *     accept} method. All other elements are assumed to have been 
successfully persisted.
      */
-    protected abstract void submitRequestEntries(
-            List<RequestEntryT> requestEntries, Consumer<List<RequestEntryT>> 
requestToRetry);
+    @Deprecated
+    protected void submitRequestEntries(
+            List<RequestEntryT> requestEntries, Consumer<List<RequestEntryT>> 
requestToRetry) {
+        throw new UnsupportedOperationException(
+                "This method is deprecated. Please override the method that 
accepts a ResultHandler.");
+    }
+
+    /**
+     * This method specifies how to persist buffered request entries into the 
destination. It is
+     * implemented when support for a new destination is added.
+     *
+     * <p>The method is invoked with a set of request entries according to the 
buffering hints (and
+     * the valid limits of the destination). The logic then needs to create 
and execute the request
+     * asynchronously against the destination (ideally by batching together 
multiple request entries
+     * to increase efficiency). The logic also needs to identify individual 
request entries that
+     * were not persisted successfully and resubmit them using the {@code 
requestToRetry} callback.
+     *
+     * <p>From a threading perspective, the mailbox thread will call this 
method and initiate the
+     * asynchronous request to persist the {@code requestEntries}. NOTE: The 
client must support

Review Comment:
   Well... You could spin up a thread pool in the sink, and not necessarily in 
the client



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to