This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f1ecb9e4701 [FLINK-35030][runtime] Introduce Epoch Manager for under 
async execution (#24748)
f1ecb9e4701 is described below

commit f1ecb9e4701d612050da54589a8f561857debf34
Author: Yanfei Lei <fredia...@gmail.com>
AuthorDate: Fri May 17 13:24:38 2024 +0800

    [FLINK-35030][runtime] Introduce Epoch Manager for under async execution 
(#24748)
---
 .../asyncprocessing/AsyncExecutionController.java  |  41 +++-
 .../runtime/asyncprocessing/EpochManager.java      | 210 +++++++++++++++++++++
 .../runtime/asyncprocessing/RecordContext.java     |  23 ++-
 .../AsyncExecutionControllerTest.java              |  92 +++++++++
 .../ContextStateFutureImplTest.java                |   3 +-
 .../runtime/asyncprocessing/EpochManagerTest.java  |  61 ++++++
 .../state/forst/ForStDBOperationTestBase.java      |   4 +-
 .../flink/state/forst/ForStStateExecutorTest.java  |   4 +-
 .../api/operators/AbstractStreamOperatorV2.java    |   4 +-
 .../AbstractAsyncStateStreamOperator.java          |  23 +++
 .../AbstractAsyncStateStreamOperatorV2.java        |  31 +++
 11 files changed, 484 insertions(+), 12 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
index f33794e16c9..693ad8753f1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.api.common.state.v2.State;
 import org.apache.flink.core.state.InternalStateFuture;
 import 
org.apache.flink.core.state.StateFutureImpl.AsyncFrameworkExceptionHandler;
+import org.apache.flink.runtime.asyncprocessing.EpochManager.ParallelMode;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.ThrowingRunnable;
@@ -76,6 +77,9 @@ public class AsyncExecutionController<K> implements 
StateRequestHandler {
      */
     private final MailboxExecutor mailboxExecutor;
 
+    /** Exception handler to handle the exception thrown by asynchronous 
framework. */
+    private final AsyncFrameworkExceptionHandler exceptionHandler;
+
     /** The key accounting unit which is used to detect the key conflict. */
     final KeyAccountingUnit<K> keyAccountingUnit;
 
@@ -86,7 +90,7 @@ public class AsyncExecutionController<K> implements 
StateRequestHandler {
     private final StateFutureFactory<K> stateFutureFactory;
 
     /** The state executor where the {@link StateRequest} is actually 
executed. */
-    StateExecutor stateExecutor;
+    final StateExecutor stateExecutor;
 
     /** The corresponding context that currently runs in task thread. */
     RecordContext<K> currentContext;
@@ -102,6 +106,15 @@ public class AsyncExecutionController<K> implements 
StateRequestHandler {
     /** Max parallelism of the job. */
     private final int maxParallelism;
 
+    /** The reference of epoch manager. */
+    final EpochManager epochManager;
+
+    /**
+     * The parallel mode of epoch execution. Keep this field internal for now, 
until we could see
+     * the concrete need for {@link ParallelMode#PARALLEL_BETWEEN_EPOCH} from 
average users.
+     */
+    final ParallelMode epochParallelMode = ParallelMode.SERIAL_BETWEEN_EPOCH;
+
     public AsyncExecutionController(
             MailboxExecutor mailboxExecutor,
             AsyncFrameworkExceptionHandler exceptionHandler,
@@ -112,6 +125,7 @@ public class AsyncExecutionController<K> implements 
StateRequestHandler {
             int maxInFlightRecords) {
         this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
         this.mailboxExecutor = mailboxExecutor;
+        this.exceptionHandler = exceptionHandler;
         this.stateFutureFactory = new StateFutureFactory<>(this, 
mailboxExecutor, exceptionHandler);
         this.stateExecutor = stateExecutor;
         this.batchSize = batchSize;
@@ -131,11 +145,13 @@ public class AsyncExecutionController<K> implements 
StateRequestHandler {
                                         },
                                         "AEC-buffer-timeout"));
 
+        this.epochManager = new EpochManager(this);
         LOG.info(
-                "Create AsyncExecutionController: batchSize {}, bufferTimeout 
{}, maxInFlightRecordNum {}",
+                "Create AsyncExecutionController: batchSize {}, bufferTimeout 
{}, maxInFlightRecordNum {}, epochParallelMode {}",
                 this.batchSize,
                 this.bufferTimeout,
-                this.maxInFlightRecordNum);
+                this.maxInFlightRecordNum,
+                this.epochParallelMode);
     }
 
     /**
@@ -152,13 +168,15 @@ public class AsyncExecutionController<K> implements 
StateRequestHandler {
                     RecordContext.EMPTY_RECORD,
                     key,
                     this::disposeContext,
-                    KeyGroupRangeAssignment.assignToKeyGroup(key, 
maxParallelism));
+                    KeyGroupRangeAssignment.assignToKeyGroup(key, 
maxParallelism),
+                    epochManager.onRecord());
         }
         return new RecordContext<>(
                 record,
                 key,
                 this::disposeContext,
-                KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism));
+                KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism),
+                epochManager.onRecord());
     }
 
     /**
@@ -177,6 +195,7 @@ public class AsyncExecutionController<K> implements 
StateRequestHandler {
      * @param toDispose the context to dispose.
      */
     void disposeContext(RecordContext<K> toDispose) {
+        epochManager.completeOneRecord(toDispose.getEpoch());
         keyAccountingUnit.release(toDispose.getRecord(), toDispose.getKey());
         inFlightRecordNum.decrementAndGet();
         RecordContext<K> nextRecordCtx =
@@ -311,6 +330,18 @@ public class AsyncExecutionController<K> implements 
StateRequestHandler {
         }
     }
 
+    public void processNonRecord(ThrowingRunnable<? extends Exception> action) 
{
+        Runnable wrappedAction =
+                () -> {
+                    try {
+                        action.run();
+                    } catch (Exception e) {
+                        exceptionHandler.handleException("Failed to process 
non-record.", e);
+                    }
+                };
+        epochManager.onNonRecord(wrappedAction, epochParallelMode);
+    }
+
     @VisibleForTesting
     public StateExecutor getStateExecutor() {
         return stateExecutor;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/EpochManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/EpochManager.java
new file mode 100644
index 00000000000..832d79904c7
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/EpochManager.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.LinkedList;
+
+/**
+ * Epoch manager segments inputs into distinct epochs, marked by the arrival 
of non-records(e.g.
+ * watermark, record attributes). Records are assigned to a unique epoch based 
on their arrival,
+ * records within an epoch are allowed to be parallelized, while the 
non-record of an epoch can only
+ * be executed when all records in this epoch have finished.
+ *
+ * <p>For more details please refer to FLIP-425.
+ */
+public class EpochManager {
+    private static final Logger LOG = 
LoggerFactory.getLogger(EpochManager.class);
+
+    /**
+     * This enum defines whether parallel execution between epochs is allowed. 
We should keep this
+     * internal and away from API module for now, until we could see the 
concrete need for {@link
+     * #PARALLEL_BETWEEN_EPOCH} from average users.
+     */
+    public enum ParallelMode {
+        /**
+         * Subsequent epochs must wait until the previous epoch is completed 
before they can start.
+         */
+        SERIAL_BETWEEN_EPOCH,
+        /**
+         * Subsequent epochs can begin execution even if the previous epoch 
has not yet completed.
+         * Usually performs better than {@link #SERIAL_BETWEEN_EPOCH}.
+         */
+        PARALLEL_BETWEEN_EPOCH
+    }
+
+    /**
+     * The reference to the {@link AsyncExecutionController}, used for {@link
+     * ParallelMode#SERIAL_BETWEEN_EPOCH}. Can be null when testing.
+     */
+    final AsyncExecutionController<?> asyncExecutionController;
+
+    /** The number of epochs that have arrived. */
+    long epochNum;
+
+    /** The output queue to hold ongoing epochs. */
+    LinkedList<Epoch> outputQueue;
+
+    /** Current active epoch, only one active epoch at the same time. */
+    Epoch activeEpoch;
+
+    public EpochManager(AsyncExecutionController<?> aec) {
+        this.epochNum = 0;
+        this.outputQueue = new LinkedList<>();
+        this.asyncExecutionController = aec;
+        // init an empty epoch, the epoch action will be updated when 
non-record is received.
+        this.activeEpoch = new Epoch(epochNum++);
+    }
+
+    /**
+     * Add a record to the current epoch and return the current open epoch, 
the epoch will be
+     * associated with the {@link RecordContext} of this record. Must be 
invoked within task thread.
+     *
+     * @return the current open epoch.
+     */
+    public Epoch onRecord() {
+        activeEpoch.ongoingRecordCount++;
+        return activeEpoch;
+    }
+
+    /**
+     * Add a non-record to the current epoch, close current epoch and open a 
new epoch. Must be
+     * invoked within task thread.
+     *
+     * @param action the action associated with this non-record.
+     * @param parallelMode the parallel mode for this epoch.
+     */
+    public void onNonRecord(Runnable action, ParallelMode parallelMode) {
+        LOG.trace(
+                "on NonRecord, old epoch: {}, outputQueue size: {}",
+                activeEpoch,
+                outputQueue.size());
+        switchActiveEpoch(action);
+        if (parallelMode == ParallelMode.SERIAL_BETWEEN_EPOCH) {
+            asyncExecutionController.drainInflightRecords(0);
+        }
+    }
+
+    /**
+     * Complete one record in the specific epoch. Must be invoked within task 
thread.
+     *
+     * @param epoch the specific epoch
+     */
+    public void completeOneRecord(Epoch epoch) {
+        if (--epoch.ongoingRecordCount == 0) {
+            tryFinishInQueue();
+        }
+    }
+
+    private void tryFinishInQueue() {
+        // If one epoch has been closed before and all records in
+        // this epoch have finished, the epoch will be removed from the output 
queue.
+        while (!outputQueue.isEmpty() && outputQueue.peek().tryFinish()) {
+            LOG.trace(
+                    "Finish epoch: {}, outputQueue size: {}",
+                    outputQueue.peek(),
+                    outputQueue.size());
+            outputQueue.pop();
+        }
+    }
+
+    private void switchActiveEpoch(Runnable action) {
+        activeEpoch.close(action);
+        outputQueue.offer(activeEpoch);
+        this.activeEpoch = new Epoch(epochNum++);
+        tryFinishInQueue();
+    }
+
+    /** The status of an epoch, see Fig.6 in FLIP-425 for details. */
+    enum EpochStatus {
+        /**
+         * The subsequent non-record input has not arrived. So arriving 
records will be collected
+         * into current epoch.
+         */
+        OPEN,
+        /**
+         * The records belong to this epoch is settled since the following 
non-record input has
+         * arrived, the newly arriving records would be collected into the 
next epoch.
+         */
+        CLOSED,
+        /**
+         * One epoch can only be finished when it meets the following three 
conditions. 1. The
+         * records of this epoch have finished execution. 2. The epoch is 
closed. 3. The epoch is in
+         * the front of outputQueue.
+         */
+        FINISHED
+    }
+
+    /**
+     * All inputs are segment into distinct epochs, marked by the arrival of 
non-record inputs.
+     * Records are assigned to a unique epoch based on their arrival.
+     */
+    public static class Epoch {
+        /** The id of this epoch for easy debugging. */
+        long id;
+        /** The number of records that are still ongoing in this epoch. */
+        int ongoingRecordCount;
+
+        /** The action associated with non-record of this epoch(e.g. advance 
watermark). */
+        @Nullable Runnable action;
+
+        EpochStatus status;
+
+        public Epoch(long id) {
+            this.id = id;
+            this.ongoingRecordCount = 0;
+            this.status = EpochStatus.OPEN;
+            this.action = null;
+        }
+
+        /**
+         * Try to finish this epoch.
+         *
+         * @return whether this epoch has been finished.
+         */
+        boolean tryFinish() {
+            if (this.status == EpochStatus.FINISHED) {
+                return true;
+            }
+            if (ongoingRecordCount == 0 && this.status == EpochStatus.CLOSED) {
+                this.status = EpochStatus.FINISHED;
+                if (action != null) {
+                    action.run();
+                }
+                return true;
+            }
+            return false;
+        }
+
+        /** Close this epoch. */
+        void close(Runnable action) {
+            this.action = action;
+            this.status = EpochStatus.CLOSED;
+        }
+
+        public String toString() {
+            return String.format(
+                    "Epoch{id=%d, ongoingRecord=%d, status=%s}", id, 
ongoingRecordCount, status);
+        }
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/RecordContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/RecordContext.java
index 82ae4f07155..86352ce1cf9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/RecordContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/RecordContext.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.asyncprocessing;
 
+import org.apache.flink.runtime.asyncprocessing.EpochManager.Epoch;
+
 import javax.annotation.Nullable;
 
 import java.util.Objects;
@@ -46,7 +48,7 @@ public class RecordContext<K> extends 
ReferenceCounted<RecordContext.DisposerRun
 
     /**
      * The disposer for disposing this context. This should be invoked in 
{@link
-     * #referenceCountReachedZero()}, which may be called once the ref count 
reaches zero in any
+     * #referenceCountReachedZero}, which may be called once the ref count 
reaches zero in any
      * thread.
      */
     private final Consumer<RecordContext<K>> disposer;
@@ -61,13 +63,18 @@ public class RecordContext<K> extends 
ReferenceCounted<RecordContext.DisposerRun
      */
     private @Nullable volatile Object extra;
 
-    public RecordContext(Object record, K key, Consumer<RecordContext<K>> 
disposer, int keyGroup) {
+    /** The epoch of this context. */
+    private final Epoch epoch;
+
+    public RecordContext(
+            Object record, K key, Consumer<RecordContext<K>> disposer, int 
keyGroup, Epoch epoch) {
         super(0);
         this.record = record;
         this.key = key;
         this.keyOccupied = false;
         this.disposer = disposer;
         this.keyGroup = keyGroup;
+        this.epoch = epoch;
     }
 
     public Object getRecord() {
@@ -112,6 +119,10 @@ public class RecordContext<K> extends 
ReferenceCounted<RecordContext.DisposerRun
         return extra;
     }
 
+    public Epoch getEpoch() {
+        return epoch;
+    }
+
     @Override
     public int hashCode() {
         return Objects.hash(record, key);
@@ -129,6 +140,12 @@ public class RecordContext<K> extends 
ReferenceCounted<RecordContext.DisposerRun
         if (!Objects.equals(record, that.record)) {
             return false;
         }
+        if (!Objects.equals(keyGroup, that.keyGroup)) {
+            return false;
+        }
+        if (!Objects.equals(epoch, that.epoch)) {
+            return false;
+        }
         return Objects.equals(key, that.key);
     }
 
@@ -143,6 +160,8 @@ public class RecordContext<K> extends 
ReferenceCounted<RecordContext.DisposerRun
                 + keyOccupied
                 + ", ref="
                 + getReferenceCount()
+                + ", epoch="
+                + epoch.id
                 + "}";
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
index 09de8dfb117..c40edbf3940 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
@@ -24,6 +24,8 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import 
org.apache.flink.core.state.StateFutureImpl.AsyncFrameworkExceptionHandler;
 import org.apache.flink.core.state.StateFutureUtils;
+import org.apache.flink.runtime.asyncprocessing.EpochManager.Epoch;
+import org.apache.flink.runtime.asyncprocessing.EpochManager.ParallelMode;
 import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
 import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
 import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
@@ -588,6 +590,96 @@ class AsyncExecutionControllerTest {
                 .isEqualTo("Caught exception when submitting StateFuture's 
callback.");
     }
 
+    @Test
+    void testEpochManager() {
+        setup(
+                1000,
+                10000,
+                6000,
+                new SyncMailboxExecutor(),
+                new TestAsyncFrameworkExceptionHandler());
+        AtomicInteger output = new AtomicInteger(0);
+        Runnable userCode = () -> valueState.asyncValue().thenAccept(v -> 
output.incrementAndGet());
+
+        String record1 = "key1-r1";
+        String key1 = "key1";
+        RecordContext<String> recordContext1 = aec.buildContext(record1, key1);
+        Epoch epoch1 = recordContext1.getEpoch();
+        aec.setCurrentContext(recordContext1);
+        userCode.run();
+
+        String record2 = "key2-r2";
+        String key2 = "key2";
+        RecordContext<String> recordContext2 = aec.buildContext(record2, key2);
+        Epoch epoch2 = recordContext2.getEpoch();
+        aec.setCurrentContext(recordContext2);
+        userCode.run();
+
+        assertThat(epoch1).isEqualTo(epoch2);
+        assertThat(epoch1.ongoingRecordCount).isEqualTo(2);
+        aec.processNonRecord(() -> output.incrementAndGet());
+
+        assertThat(output.get()).isEqualTo(3);
+        // SERIAL_BETWEEN_EPOCH mode would drain in-flight records on 
non-record arriving.
+        assertThat(epoch1.ongoingRecordCount).isEqualTo(0);
+    }
+
+    @Test
+    void testMixEpochMode() {
+        // epoch1(parallel mode) -> epoch2(parallel mode) -> epoch3(serial 
mode),
+        // when epoch2 close, epoch1 is still in-flight.
+        // when epoch3 close, all in-flight records should drain, epoch1 and 
epoch2 should finish.
+        setup(
+                1000,
+                10000,
+                6000,
+                new SyncMailboxExecutor(),
+                new TestAsyncFrameworkExceptionHandler());
+        AtomicInteger output = new AtomicInteger(0);
+        Runnable userCode = () -> valueState.asyncValue().thenAccept(v -> 
output.incrementAndGet());
+
+        String record1 = "key1-r1";
+        String key1 = "key1";
+        RecordContext<String> recordContext1 = aec.buildContext(record1, key1);
+        Epoch epoch1 = recordContext1.getEpoch();
+        aec.setCurrentContext(recordContext1);
+        userCode.run();
+
+        aec.epochManager.onNonRecord(
+                () -> output.incrementAndGet(), 
ParallelMode.PARALLEL_BETWEEN_EPOCH);
+        assertThat(epoch1.ongoingRecordCount).isEqualTo(1);
+
+        String record2 = "key2-r2";
+        String key2 = "key2";
+        RecordContext<String> recordContext2 = aec.buildContext(record2, key2);
+        Epoch epoch2 = recordContext2.getEpoch();
+        aec.setCurrentContext(recordContext2);
+        userCode.run();
+        assertThat(epoch1.ongoingRecordCount).isEqualTo(1);
+        assertThat(epoch2.ongoingRecordCount).isEqualTo(1);
+        aec.epochManager.onNonRecord(
+                () -> output.incrementAndGet(), 
ParallelMode.PARALLEL_BETWEEN_EPOCH);
+        assertThat(epoch1.ongoingRecordCount).isEqualTo(1);
+        assertThat(epoch2.ongoingRecordCount).isEqualTo(1);
+        assertThat(output.get()).isEqualTo(0);
+
+        String record3 = "key3-r3";
+        String key3 = "key3";
+        RecordContext<String> recordContext3 = aec.buildContext(record3, key3);
+        Epoch epoch3 = recordContext3.getEpoch();
+        aec.setCurrentContext(recordContext3);
+        userCode.run();
+        assertThat(epoch1.ongoingRecordCount).isEqualTo(1);
+        assertThat(epoch2.ongoingRecordCount).isEqualTo(1);
+        assertThat(epoch3.ongoingRecordCount).isEqualTo(1);
+        aec.epochManager.onNonRecord(
+                () -> output.incrementAndGet(), 
ParallelMode.SERIAL_BETWEEN_EPOCH);
+        assertThat(epoch1.ongoingRecordCount).isEqualTo(0);
+        assertThat(epoch2.ongoingRecordCount).isEqualTo(0);
+        assertThat(epoch3.ongoingRecordCount).isEqualTo(0);
+        assertThat(output.get()).isEqualTo(6);
+    }
+
     /** Simulate the underlying state that is actually used to execute the 
request. */
     static class TestUnderlyingState {
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImplTest.java
index 811fc95cae5..5d8f81c440e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImplTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.asyncprocessing;
 
 import 
org.apache.flink.core.state.StateFutureImpl.AsyncFrameworkExceptionHandler;
 import org.apache.flink.core.state.StateFutureUtils;
+import org.apache.flink.runtime.asyncprocessing.EpochManager.Epoch;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.FlinkRuntimeException;
@@ -306,7 +307,7 @@ public class ContextStateFutureImplTest {
 
     private <K> RecordContext<K> buildRecordContext(Object record, K key) {
         int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(key, 128);
-        return new RecordContext<>(record, key, (e) -> {}, keyGroup);
+        return new RecordContext<>(record, key, (e) -> {}, keyGroup, new 
Epoch(0));
     }
 
     /** A runner that performs single-step debugging. */
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/EpochManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/EpochManagerTest.java
new file mode 100644
index 00000000000..361d543776e
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/EpochManagerTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing;
+
+import org.apache.flink.runtime.asyncprocessing.EpochManager.Epoch;
+import org.apache.flink.runtime.asyncprocessing.EpochManager.EpochStatus;
+import org.apache.flink.runtime.asyncprocessing.EpochManager.ParallelMode;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/** Test for Epoch Manager. */
+class EpochManagerTest {
+    @Test
+    void testBasic() {
+        EpochManager epochManager = new EpochManager(null);
+        Epoch epoch1 = epochManager.onRecord();
+        Epoch epoch2 = epochManager.onRecord();
+        assertThat(epoch1).isEqualTo(epoch2);
+        assertThat(epoch1.ongoingRecordCount).isEqualTo(2);
+        AtomicInteger output = new AtomicInteger(0);
+        epochManager.onNonRecord(
+                () -> output.incrementAndGet(), 
ParallelMode.PARALLEL_BETWEEN_EPOCH);
+        // record3 is in a new epoch
+        Epoch epoch3 = epochManager.onRecord();
+        assertThat(epoch3).isNotEqualTo(epoch1);
+        assertThat(epochManager.outputQueue.size()).isEqualTo(1);
+        
assertThat(epochManager.outputQueue.peek().status).isEqualTo(EpochStatus.CLOSED);
+        // records in first epoch are not finished, so the non-record action 
is not executed
+        assertThat(output.get()).isEqualTo(0);
+        epochManager.completeOneRecord(epoch1);
+        epochManager.completeOneRecord(epoch2);
+        epochManager.completeOneRecord(epoch3);
+        // records in first epoch are finished, so the non-record action is 
executed
+        assertThat(output.get()).isEqualTo(1);
+        // records in second epoch are finished, but no newly arrived 
non-record, so the second
+        // epoch is still open
+        assertThat(epochManager.outputQueue.size()).isEqualTo(0);
+        assertThat(epochManager.activeEpoch.ongoingRecordCount).isEqualTo(0);
+        
assertThat(epochManager.activeEpoch.status).isEqualTo(EpochStatus.OPEN);
+    }
+}
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java
index 7c6887a2a2d..3d31737beb0 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.core.state.InternalStateFuture;
+import org.apache.flink.runtime.asyncprocessing.EpochManager.Epoch;
 import org.apache.flink.runtime.asyncprocessing.RecordContext;
 import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
 import org.apache.flink.runtime.asyncprocessing.StateRequestType;
@@ -87,7 +88,8 @@ public class ForStDBOperationTestBase {
 
     protected ContextKey<Integer> buildContextKey(int i) {
         int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(i, 128);
-        RecordContext<Integer> recordContext = new RecordContext<>(i, i, t -> 
{}, keyGroup);
+        RecordContext<Integer> recordContext =
+                new RecordContext<>(i, i, t -> {}, keyGroup, new Epoch(0));
         return new ContextKey<>(recordContext);
     }
 
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java
index 3e17f30d34b..8c597ec1167 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.state.forst;
 
+import org.apache.flink.runtime.asyncprocessing.EpochManager.Epoch;
 import org.apache.flink.runtime.asyncprocessing.RecordContext;
 import org.apache.flink.runtime.asyncprocessing.StateRequest;
 import org.apache.flink.runtime.asyncprocessing.StateRequestContainer;
@@ -127,7 +128,8 @@ public class ForStStateExecutorTest extends 
ForStDBOperationTestBase {
             V value,
             R record) {
         int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(key, 128);
-        RecordContext<K> recordContext = new RecordContext<>(record, key, t -> 
{}, keyGroup);
+        RecordContext<K> recordContext =
+                new RecordContext<>(record, key, t -> {}, keyGroup, new 
Epoch(0));
         TestStateFuture stateFuture = new TestStateFuture<>();
         return new StateRequest<>(innerTable, requestType, value, stateFuture, 
recordContext);
     }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
index 20441bcffa5..605186d772e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
@@ -93,7 +93,7 @@ public abstract class AbstractStreamOperatorV2<OUT>
     private final ExecutionConfig executionConfig;
     private final ClassLoader userCodeClassLoader;
     private final CloseableRegistry cancelables;
-    private final IndexedCombinedWatermarkStatus combinedWatermark;
+    protected final IndexedCombinedWatermarkStatus combinedWatermark;
 
     /** Metric group for the operator. */
     protected final InternalOperatorMetricGroup metrics;
@@ -493,7 +493,7 @@ public abstract class AbstractStreamOperatorV2<OUT>
         }
     }
 
-    public final void processWatermarkStatus(WatermarkStatus watermarkStatus, 
int inputId)
+    public void processWatermarkStatus(WatermarkStatus watermarkStatus, int 
inputId)
             throws Exception {
         boolean wasIdle = combinedWatermark.isIdle();
         if (combinedWatermark.updateStatus(inputId - 1, 
watermarkStatus.isIdle())) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java
index 6c1598c96c8..a87a9049583 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java
@@ -39,8 +39,10 @@ import 
org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
 import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
 import org.apache.flink.streaming.api.operators.Triggerable;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.util.function.ThrowingConsumer;
 import org.apache.flink.util.function.ThrowingRunnable;
 
@@ -259,6 +261,27 @@ public abstract class 
AbstractAsyncStateStreamOperator<OUT> extends AbstractStre
         return currentProcessingContext.getKey();
     }
 
+    @Override
+    public void processWatermark(Watermark mark) throws Exception {
+        if (!isAsyncStateProcessingEnabled()) {
+            // If async state processing is disabled, fallback to the super 
class.
+            super.processWatermark(mark);
+            return;
+        }
+        asyncExecutionController.processNonRecord(() -> 
super.processWatermark(mark));
+    }
+
+    @Override
+    public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws 
Exception {
+        if (!isAsyncStateProcessingEnabled()) {
+            // If async state processing is disabled, fallback to the super 
class.
+            super.processWatermarkStatus(watermarkStatus);
+            return;
+        }
+        asyncExecutionController.processNonRecord(
+                () -> super.processWatermarkStatus(watermarkStatus));
+    }
+
     @VisibleForTesting
     AsyncExecutionController<?> getAsyncExecutionController() {
         return asyncExecutionController;
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java
index 92a41eaeb06..a142a59ccac 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java
@@ -37,7 +37,9 @@ import 
org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
 import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
 import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
 import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.util.function.ThrowingConsumer;
 import org.apache.flink.util.function.ThrowingRunnable;
 
@@ -188,6 +190,35 @@ public abstract class 
AbstractAsyncStateStreamOperatorV2<OUT> extends AbstractSt
                 (AsyncExecutionController<K>) asyncExecutionController);
     }
 
+    @Override
+    public void processWatermark(Watermark mark) throws Exception {
+        if (!isAsyncStateProcessingEnabled()) {
+            super.processWatermark(mark);
+            return;
+        }
+        asyncExecutionController.processNonRecord(() -> 
super.processWatermark(mark));
+    }
+
+    @Override
+    public void processWatermarkStatus(WatermarkStatus watermarkStatus, int 
inputId)
+            throws Exception {
+        if (!isAsyncStateProcessingEnabled()) {
+            super.processWatermarkStatus(watermarkStatus, inputId);
+            return;
+        }
+        asyncExecutionController.processNonRecord(
+                () -> {
+                    boolean wasIdle = combinedWatermark.isIdle();
+                    if (combinedWatermark.updateStatus(inputId - 1, 
watermarkStatus.isIdle())) {
+                        super.processWatermark(
+                                new 
Watermark(combinedWatermark.getCombinedWatermark()));
+                    }
+                    if (wasIdle != combinedWatermark.isIdle()) {
+                        output.emitWatermarkStatus(watermarkStatus);
+                    }
+                });
+    }
+
     @VisibleForTesting
     AsyncExecutionController<?> getAsyncExecutionController() {
         return asyncExecutionController;


Reply via email to