xishuaidelin commented on code in PR #27357:
URL: https://github.com/apache/flink/pull/27357#discussion_r2922635990


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/OrderedAsyncBatchWaitOperator.java:
##########
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import 
org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback;
+import org.apache.flink.streaming.api.functions.async.AsyncBatchFunction;
+import org.apache.flink.streaming.api.functions.async.CollectionSupplier;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * The {@link OrderedAsyncBatchWaitOperator} batches incoming stream records 
and invokes the {@link
+ * AsyncBatchFunction} when the batch size reaches the configured maximum or 
when the batch timeout
+ * is reached.
+ *
+ * <p>This operator implements <b>ordered semantics</b> - output records are 
emitted in the same
+ * order as input records, even though async batch invocations may complete 
out-of-order internally.
+ *
+ * <p>Ordering is achieved by:
+ *
+ * <ul>
+ *   <li>Assigning a monotonic sequence number to each batch
+ *   <li>Buffering completed batch results in a pending results map
+ *   <li>Emitting results strictly in sequence order
+ * </ul>
+ *
+ * <p>Key behaviors:
+ *
+ * <ul>
+ *   <li>Buffer incoming records until batch size is reached OR timeout expires
+ *   <li>Flush remaining records when end of input is signaled
+ *   <li>Wait for all batches to complete and emit in order before finishing
+ * </ul>
+ *
+ * <p>Timer lifecycle (when batchTimeoutMs > 0):
+ *
+ * <ul>
+ *   <li>Timer is registered when first element is added to an empty buffer
+ *   <li>Timer fires at: currentBatchStartTime + batchTimeoutMs
+ *   <li>Timer is cleared when batch is flushed (by size, timeout, or 
end-of-input)
+ *   <li>At most one timer is active at any time
+ * </ul>
+ *
+ * <p>Future enhancements may include:
+ *
+ * <ul>
+ *   <li>Event-time or watermark-based ordering
+ *   <li>Multiple inflight batches concurrency control
+ *   <li>Retry logic
+ *   <li>Metrics
+ * </ul>
+ *
+ * @param <IN> Input type for the operator.
+ * @param <OUT> Output type for the operator.
+ */
+@Internal
+public class OrderedAsyncBatchWaitOperator<IN, OUT> extends 
AbstractStreamOperator<OUT>

Review Comment:
   There is a lot of duplicated logic in both operators. Could we refactor 
OrderedAsyncBatchWaitOperator to extend AsyncBatchWaitOperator? 



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/OrderedAsyncBatchWaitOperator.java:
##########
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import 
org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback;
+import org.apache.flink.streaming.api.functions.async.AsyncBatchFunction;
+import org.apache.flink.streaming.api.functions.async.CollectionSupplier;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * The {@link OrderedAsyncBatchWaitOperator} batches incoming stream records 
and invokes the {@link
+ * AsyncBatchFunction} when the batch size reaches the configured maximum or 
when the batch timeout
+ * is reached.
+ *
+ * <p>This operator implements <b>ordered semantics</b> - output records are 
emitted in the same
+ * order as input records, even though async batch invocations may complete 
out-of-order internally.
+ *
+ * <p>Ordering is achieved by:
+ *
+ * <ul>
+ *   <li>Assigning a monotonic sequence number to each batch
+ *   <li>Buffering completed batch results in a pending results map
+ *   <li>Emitting results strictly in sequence order
+ * </ul>
+ *
+ * <p>Key behaviors:
+ *
+ * <ul>
+ *   <li>Buffer incoming records until batch size is reached OR timeout expires
+ *   <li>Flush remaining records when end of input is signaled
+ *   <li>Wait for all batches to complete and emit in order before finishing
+ * </ul>
+ *
+ * <p>Timer lifecycle (when batchTimeoutMs > 0):
+ *
+ * <ul>
+ *   <li>Timer is registered when first element is added to an empty buffer
+ *   <li>Timer fires at: currentBatchStartTime + batchTimeoutMs
+ *   <li>Timer is cleared when batch is flushed (by size, timeout, or 
end-of-input)
+ *   <li>At most one timer is active at any time
+ * </ul>
+ *
+ * <p>Future enhancements may include:
+ *
+ * <ul>
+ *   <li>Event-time or watermark-based ordering
+ *   <li>Multiple inflight batches concurrency control
+ *   <li>Retry logic
+ *   <li>Metrics
+ * </ul>
+ *
+ * @param <IN> Input type for the operator.
+ * @param <OUT> Output type for the operator.
+ */
+@Internal
+public class OrderedAsyncBatchWaitOperator<IN, OUT> extends 
AbstractStreamOperator<OUT>
+        implements OneInputStreamOperator<IN, OUT>, BoundedOneInput, 
ProcessingTimeCallback {
+
+    private static final long serialVersionUID = 1L;
+
+    /** Constant indicating timeout is disabled. */
+    private static final long NO_TIMEOUT = 0L;
+
+    /** The async batch function to invoke. */
+    private final AsyncBatchFunction<IN, OUT> asyncBatchFunction;
+
+    /** Maximum batch size before triggering async invocation. */
+    private final int maxBatchSize;
+
+    /**
+     * Batch timeout in milliseconds. When positive, a timer is registered to 
flush the batch after
+     * this duration since the first buffered element. A value <= 0 disables 
timeout-based batching.
+     */
+    private final long batchTimeoutMs;
+
+    /** Buffer for incoming stream records. */
+    private transient List<IN> buffer;
+
+    /** Mailbox executor for processing async results on the main thread. */
+    private final transient MailboxExecutor mailboxExecutor;
+
+    /** Counter for in-flight async operations. */
+    private transient int inFlightCount;
+
+    // 
================================================================================
+    //  Timer state fields for timeout-based batching
+    // 
================================================================================
+
+    /**
+     * The processing time when the current batch started (i.e., when first 
element was added to
+     * empty buffer). Used to calculate timer fire time.
+     */
+    private transient long currentBatchStartTime;
+
+    /** Whether a timer is currently registered for the current batch. */
+    private transient boolean timerRegistered;
+
+    // 
================================================================================
+    //  Ordered emission state fields
+    // 
================================================================================
+
+    /**
+     * The sequence number to assign to the next batch. Monotonically 
increasing, starting from 0.
+     */
+    private transient long nextBatchSequenceNumber;
+
+    /**
+     * The sequence number of the next batch whose results should be emitted. 
Used to ensure
+     * strictly ordered output emission.
+     */
+    private transient long nextExpectedSequenceNumber;
+
+    /**
+     * Pending results buffer. Maps batch sequence number to completed 
results. Results are held
+     * here until all preceding batches have been emitted.
+     */
+    private transient Map<Long, Collection<OUT>> pendingResults;

Review Comment:
   Current approach uses a TreeMap combined with nextBatchSequenceNumber and 
nextExpectedSequenceNumber to manually simulate FIFO ordering. 
   
   This introduces unnecessary complexity.How about refactoring this to adopt 
the standard FIFO queue pattern existing in AsyncWaitOperator, which would 
simplify the logic and make the ordering guarantees more robust.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java:
##########
@@ -319,4 +323,159 @@ public static <IN, OUT> SingleOutputStreamOperator<OUT> 
orderedWaitWithRetry(
                 OutputMode.ORDERED,
                 asyncRetryStrategy);
     }
+
+    // 
================================================================================
+    //  Batch Async Operations
+    // 
================================================================================
+
+    /**
+     * Adds an AsyncBatchWaitOperator to process elements in batches. The 
order of output stream
+     * records may be reordered (unordered mode).
+     *
+     * <p>This method is particularly useful for high-latency inference 
workloads where batching can
+     * significantly improve throughput, such as machine learning model 
inference.
+     *
+     * <p>The operator buffers incoming elements and triggers the async batch 
function when the
+     * buffer reaches {@code maxBatchSize}. Remaining elements are flushed 
when the input ends.
+     *
+     * @param in Input {@link DataStream}
+     * @param func {@link AsyncBatchFunction} to process batches of elements
+     * @param maxBatchSize Maximum number of elements to batch before 
triggering async invocation
+     * @param <IN> Type of input record
+     * @param <OUT> Type of output record
+     * @return A new {@link SingleOutputStreamOperator}
+     */
+    public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWaitBatch(
+            DataStream<IN> in, AsyncBatchFunction<IN, OUT> func, int 
maxBatchSize) {
+        return unorderedWaitBatch(in, func, maxBatchSize, 0L);
+    }
+
+    /**
+     * Adds an AsyncBatchWaitOperator to process elements in batches with 
timeout support. The order
+     * of output stream records may be reordered (unordered mode).
+     *
+     * <p>This method is particularly useful for high-latency inference 
workloads where batching can
+     * significantly improve throughput, such as machine learning model 
inference.
+     *
+     * <p>The operator buffers incoming elements and triggers the async batch 
function when either:
+     *
+     * <ul>
+     *   <li>The buffer reaches {@code maxBatchSize}
+     *   <li>The {@code batchTimeoutMs} has elapsed since the first buffered 
element (if timeout is
+     *       enabled)
+     * </ul>
+     *
+     * <p>Remaining elements are flushed when the input ends.
+     *
+     * @param in Input {@link DataStream}
+     * @param func {@link AsyncBatchFunction} to process batches of elements
+     * @param maxBatchSize Maximum number of elements to batch before 
triggering async invocation
+     * @param batchTimeoutMs Batch timeout in milliseconds; <= 0 means timeout 
is disabled
+     * @param <IN> Type of input record
+     * @param <OUT> Type of output record
+     * @return A new {@link SingleOutputStreamOperator}
+     */
+    public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWaitBatch(
+            DataStream<IN> in,
+            AsyncBatchFunction<IN, OUT> func,
+            int maxBatchSize,
+            long batchTimeoutMs) {
+        Preconditions.checkArgument(maxBatchSize > 0, "maxBatchSize must be 
greater than 0");
+
+        TypeInformation<OUT> outTypeInfo =
+                TypeExtractor.getUnaryOperatorReturnType(
+                        func,
+                        AsyncBatchFunction.class,
+                        0,
+                        1,
+                        new int[] {1, 0},
+                        in.getType(),
+                        Utils.getCallLocationName(),
+                        true);
+
+        // create transform
+        AsyncBatchWaitOperatorFactory<IN, OUT> operatorFactory =
+                new AsyncBatchWaitOperatorFactory<>(
+                        in.getExecutionEnvironment().clean(func), 
maxBatchSize, batchTimeoutMs);
+
+        return in.transform("async batch wait operator", outTypeInfo, 
operatorFactory);
+    }
+
+    /**
+     * Adds an AsyncBatchWaitOperator to process elements in batches with 
ordered output. The order
+     * of output stream records is guaranteed to be the same as input order.
+     *
+     * <p>This method is particularly useful for high-latency inference 
workloads where batching can
+     * significantly improve throughput while maintaining ordering guarantees, 
such as machine
+     * learning model inference with order-sensitive downstream processing.
+     *
+     * <p>The operator buffers incoming elements and triggers the async batch 
function when either:
+     *
+     * <ul>
+     *   <li>The buffer reaches {@code maxBatchSize}
+     *   <li>The {@code maxWaitTime} has elapsed since the first buffered 
element
+     * </ul>
+     *
+     * <p>Results are buffered and emitted in the original input order, 
regardless of async
+     * completion order.
+     *
+     * @param in Input {@link DataStream}
+     * @param func {@link AsyncBatchFunction} to process batches of elements
+     * @param maxBatchSize Maximum number of elements to batch before 
triggering async invocation
+     * @param maxWaitTime Maximum duration to wait before flushing a partial 
batch; Duration.ZERO or
+     *     negative means timeout is disabled
+     * @param <IN> Type of input record
+     * @param <OUT> Type of output record
+     * @return A new {@link SingleOutputStreamOperator}
+     */
+    public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWaitBatch(
+            DataStream<IN> in,
+            AsyncBatchFunction<IN, OUT> func,
+            int maxBatchSize,
+            Duration maxWaitTime) {

Review Comment:
   Nit: There is an inconsistency in how time intervals are represented in this 
signature compared to similar methods.
   `unorderedWaitBatch(
               DataStream<IN> in,
               AsyncBatchFunction<IN, OUT> func,
               int maxBatchSize,
               long batchTimeoutMs)`



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/OrderedAsyncBatchWaitOperatorTest.java:
##########
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
+import org.apache.flink.streaming.api.functions.async.AsyncBatchFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests for {@link OrderedAsyncBatchWaitOperator}.
+ *
+ * <p>These tests verify:
+ *
+ * <ul>
+ *   <li>Strict ordering guarantee - output order matches input order
+ *   <li>Batch + time trigger interaction with ordering
+ *   <li>Exception propagation
+ * </ul>
+ */
+@Timeout(value = 100, unit = TimeUnit.SECONDS)
+class OrderedAsyncBatchWaitOperatorTest {
+
+    /**
+     * Test strict ordering guarantee even when async results complete out of 
order.
+     *
+     * <p>Inputs: [1, 2, 3, 4, 5]
+     *
+     * <p>Async batches complete in reverse order (second batch completes 
before first)
+     *
+     * <p>Output MUST be: [1, 2, 3, 4, 5] (same as input order)
+     */
+    @Test
+    void testStrictOrderingGuarantee() throws Exception {
+        final int maxBatchSize = 3;
+        final List<CompletableFuture<Void>> batchFutures = new 
CopyOnWriteArrayList<>();
+        final AtomicInteger batchIndex = new AtomicInteger(0);
+
+        AsyncBatchFunction<Integer, Integer> function =
+                (inputs, resultFuture) -> {
+                    int currentBatch = batchIndex.getAndIncrement();
+                    CompletableFuture<Void> future = new CompletableFuture<>();
+                    batchFutures.add(future);
+
+                    // Store input for later completion
+                    List<Integer> inputCopy = new ArrayList<>(inputs);
+
+                    // Complete asynchronously when future is completed 
externally
+                    future.thenRun(() -> resultFuture.complete(inputCopy));
+                };
+
+        try (OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
+                createTestHarness(function, maxBatchSize)) {
+
+            testHarness.open();
+
+            // Process 5 elements: batch 0 = [1,2,3], batch 1 = [4,5]
+            for (int i = 1; i <= 5; i++) {
+                testHarness.processElement(new StreamRecord<>(i, i));
+            }
+
+            // Trigger end of input to flush remaining elements
+            // This creates batch 1 with [4, 5]
+            // At this point we have 2 batches pending
+
+            // Wait for batches to be created
+            while (batchFutures.size() < 2) {
+                Thread.sleep(10);
+            }
+
+            // Complete batches in REVERSE order (batch 1 first, then batch 0)
+            // This tests that output is still in original order
+            batchFutures.get(1).complete(null); // Complete batch [4, 5] first
+            Thread.sleep(50); // Give time for async processing

Review Comment:
   This would cause flaky tests in CI. Please switch to a robust 
synchronization method like CountDownLatch instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to