This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new f1ecb9e4701 [FLINK-35030][runtime] Introduce Epoch Manager for under async execution (#24748) f1ecb9e4701 is described below commit f1ecb9e4701d612050da54589a8f561857debf34 Author: Yanfei Lei <fredia...@gmail.com> AuthorDate: Fri May 17 13:24:38 2024 +0800 [FLINK-35030][runtime] Introduce Epoch Manager for under async execution (#24748) --- .../asyncprocessing/AsyncExecutionController.java | 41 +++- .../runtime/asyncprocessing/EpochManager.java | 210 +++++++++++++++++++++ .../runtime/asyncprocessing/RecordContext.java | 23 ++- .../AsyncExecutionControllerTest.java | 92 +++++++++ .../ContextStateFutureImplTest.java | 3 +- .../runtime/asyncprocessing/EpochManagerTest.java | 61 ++++++ .../state/forst/ForStDBOperationTestBase.java | 4 +- .../flink/state/forst/ForStStateExecutorTest.java | 4 +- .../api/operators/AbstractStreamOperatorV2.java | 4 +- .../AbstractAsyncStateStreamOperator.java | 23 +++ .../AbstractAsyncStateStreamOperatorV2.java | 31 +++ 11 files changed, 484 insertions(+), 12 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java index f33794e16c9..693ad8753f1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.api.common.state.v2.State; import org.apache.flink.core.state.InternalStateFuture; import org.apache.flink.core.state.StateFutureImpl.AsyncFrameworkExceptionHandler; +import org.apache.flink.runtime.asyncprocessing.EpochManager.ParallelMode; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.ThrowingRunnable; @@ -76,6 +77,9 @@ public class AsyncExecutionController<K> implements StateRequestHandler { */ private final MailboxExecutor mailboxExecutor; + /** Exception handler to handle the exception thrown by asynchronous framework. */ + private final AsyncFrameworkExceptionHandler exceptionHandler; + /** The key accounting unit which is used to detect the key conflict. */ final KeyAccountingUnit<K> keyAccountingUnit; @@ -86,7 +90,7 @@ public class AsyncExecutionController<K> implements StateRequestHandler { private final StateFutureFactory<K> stateFutureFactory; /** The state executor where the {@link StateRequest} is actually executed. */ - StateExecutor stateExecutor; + final StateExecutor stateExecutor; /** The corresponding context that currently runs in task thread. */ RecordContext<K> currentContext; @@ -102,6 +106,15 @@ public class AsyncExecutionController<K> implements StateRequestHandler { /** Max parallelism of the job. */ private final int maxParallelism; + /** The reference of epoch manager. */ + final EpochManager epochManager; + + /** + * The parallel mode of epoch execution. Keep this field internal for now, until we could see + * the concrete need for {@link ParallelMode#PARALLEL_BETWEEN_EPOCH} from average users. + */ + final ParallelMode epochParallelMode = ParallelMode.SERIAL_BETWEEN_EPOCH; + public AsyncExecutionController( MailboxExecutor mailboxExecutor, AsyncFrameworkExceptionHandler exceptionHandler, @@ -112,6 +125,7 @@ public class AsyncExecutionController<K> implements StateRequestHandler { int maxInFlightRecords) { this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords); this.mailboxExecutor = mailboxExecutor; + this.exceptionHandler = exceptionHandler; this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor, exceptionHandler); this.stateExecutor = stateExecutor; this.batchSize = batchSize; @@ -131,11 +145,13 @@ public class AsyncExecutionController<K> implements StateRequestHandler { }, "AEC-buffer-timeout")); + this.epochManager = new EpochManager(this); LOG.info( - "Create AsyncExecutionController: batchSize {}, bufferTimeout {}, maxInFlightRecordNum {}", + "Create AsyncExecutionController: batchSize {}, bufferTimeout {}, maxInFlightRecordNum {}, epochParallelMode {}", this.batchSize, this.bufferTimeout, - this.maxInFlightRecordNum); + this.maxInFlightRecordNum, + this.epochParallelMode); } /** @@ -152,13 +168,15 @@ public class AsyncExecutionController<K> implements StateRequestHandler { RecordContext.EMPTY_RECORD, key, this::disposeContext, - KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism)); + KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism), + epochManager.onRecord()); } return new RecordContext<>( record, key, this::disposeContext, - KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism)); + KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism), + epochManager.onRecord()); } /** @@ -177,6 +195,7 @@ public class AsyncExecutionController<K> implements StateRequestHandler { * @param toDispose the context to dispose. */ void disposeContext(RecordContext<K> toDispose) { + epochManager.completeOneRecord(toDispose.getEpoch()); keyAccountingUnit.release(toDispose.getRecord(), toDispose.getKey()); inFlightRecordNum.decrementAndGet(); RecordContext<K> nextRecordCtx = @@ -311,6 +330,18 @@ public class AsyncExecutionController<K> implements StateRequestHandler { } } + public void processNonRecord(ThrowingRunnable<? extends Exception> action) { + Runnable wrappedAction = + () -> { + try { + action.run(); + } catch (Exception e) { + exceptionHandler.handleException("Failed to process non-record.", e); + } + }; + epochManager.onNonRecord(wrappedAction, epochParallelMode); + } + @VisibleForTesting public StateExecutor getStateExecutor() { return stateExecutor; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/EpochManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/EpochManager.java new file mode 100644 index 00000000000..832d79904c7 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/EpochManager.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.LinkedList; + +/** + * Epoch manager segments inputs into distinct epochs, marked by the arrival of non-records(e.g. + * watermark, record attributes). Records are assigned to a unique epoch based on their arrival, + * records within an epoch are allowed to be parallelized, while the non-record of an epoch can only + * be executed when all records in this epoch have finished. + * + * <p>For more details please refer to FLIP-425. + */ +public class EpochManager { + private static final Logger LOG = LoggerFactory.getLogger(EpochManager.class); + + /** + * This enum defines whether parallel execution between epochs is allowed. We should keep this + * internal and away from API module for now, until we could see the concrete need for {@link + * #PARALLEL_BETWEEN_EPOCH} from average users. + */ + public enum ParallelMode { + /** + * Subsequent epochs must wait until the previous epoch is completed before they can start. + */ + SERIAL_BETWEEN_EPOCH, + /** + * Subsequent epochs can begin execution even if the previous epoch has not yet completed. + * Usually performs better than {@link #SERIAL_BETWEEN_EPOCH}. + */ + PARALLEL_BETWEEN_EPOCH + } + + /** + * The reference to the {@link AsyncExecutionController}, used for {@link + * ParallelMode#SERIAL_BETWEEN_EPOCH}. Can be null when testing. + */ + final AsyncExecutionController<?> asyncExecutionController; + + /** The number of epochs that have arrived. */ + long epochNum; + + /** The output queue to hold ongoing epochs. */ + LinkedList<Epoch> outputQueue; + + /** Current active epoch, only one active epoch at the same time. */ + Epoch activeEpoch; + + public EpochManager(AsyncExecutionController<?> aec) { + this.epochNum = 0; + this.outputQueue = new LinkedList<>(); + this.asyncExecutionController = aec; + // init an empty epoch, the epoch action will be updated when non-record is received. + this.activeEpoch = new Epoch(epochNum++); + } + + /** + * Add a record to the current epoch and return the current open epoch, the epoch will be + * associated with the {@link RecordContext} of this record. Must be invoked within task thread. + * + * @return the current open epoch. + */ + public Epoch onRecord() { + activeEpoch.ongoingRecordCount++; + return activeEpoch; + } + + /** + * Add a non-record to the current epoch, close current epoch and open a new epoch. Must be + * invoked within task thread. + * + * @param action the action associated with this non-record. + * @param parallelMode the parallel mode for this epoch. + */ + public void onNonRecord(Runnable action, ParallelMode parallelMode) { + LOG.trace( + "on NonRecord, old epoch: {}, outputQueue size: {}", + activeEpoch, + outputQueue.size()); + switchActiveEpoch(action); + if (parallelMode == ParallelMode.SERIAL_BETWEEN_EPOCH) { + asyncExecutionController.drainInflightRecords(0); + } + } + + /** + * Complete one record in the specific epoch. Must be invoked within task thread. + * + * @param epoch the specific epoch + */ + public void completeOneRecord(Epoch epoch) { + if (--epoch.ongoingRecordCount == 0) { + tryFinishInQueue(); + } + } + + private void tryFinishInQueue() { + // If one epoch has been closed before and all records in + // this epoch have finished, the epoch will be removed from the output queue. + while (!outputQueue.isEmpty() && outputQueue.peek().tryFinish()) { + LOG.trace( + "Finish epoch: {}, outputQueue size: {}", + outputQueue.peek(), + outputQueue.size()); + outputQueue.pop(); + } + } + + private void switchActiveEpoch(Runnable action) { + activeEpoch.close(action); + outputQueue.offer(activeEpoch); + this.activeEpoch = new Epoch(epochNum++); + tryFinishInQueue(); + } + + /** The status of an epoch, see Fig.6 in FLIP-425 for details. */ + enum EpochStatus { + /** + * The subsequent non-record input has not arrived. So arriving records will be collected + * into current epoch. + */ + OPEN, + /** + * The records belong to this epoch is settled since the following non-record input has + * arrived, the newly arriving records would be collected into the next epoch. + */ + CLOSED, + /** + * One epoch can only be finished when it meets the following three conditions. 1. The + * records of this epoch have finished execution. 2. The epoch is closed. 3. The epoch is in + * the front of outputQueue. + */ + FINISHED + } + + /** + * All inputs are segment into distinct epochs, marked by the arrival of non-record inputs. + * Records are assigned to a unique epoch based on their arrival. + */ + public static class Epoch { + /** The id of this epoch for easy debugging. */ + long id; + /** The number of records that are still ongoing in this epoch. */ + int ongoingRecordCount; + + /** The action associated with non-record of this epoch(e.g. advance watermark). */ + @Nullable Runnable action; + + EpochStatus status; + + public Epoch(long id) { + this.id = id; + this.ongoingRecordCount = 0; + this.status = EpochStatus.OPEN; + this.action = null; + } + + /** + * Try to finish this epoch. + * + * @return whether this epoch has been finished. + */ + boolean tryFinish() { + if (this.status == EpochStatus.FINISHED) { + return true; + } + if (ongoingRecordCount == 0 && this.status == EpochStatus.CLOSED) { + this.status = EpochStatus.FINISHED; + if (action != null) { + action.run(); + } + return true; + } + return false; + } + + /** Close this epoch. */ + void close(Runnable action) { + this.action = action; + this.status = EpochStatus.CLOSED; + } + + public String toString() { + return String.format( + "Epoch{id=%d, ongoingRecord=%d, status=%s}", id, ongoingRecordCount, status); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/RecordContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/RecordContext.java index 82ae4f07155..86352ce1cf9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/RecordContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/RecordContext.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.asyncprocessing; +import org.apache.flink.runtime.asyncprocessing.EpochManager.Epoch; + import javax.annotation.Nullable; import java.util.Objects; @@ -46,7 +48,7 @@ public class RecordContext<K> extends ReferenceCounted<RecordContext.DisposerRun /** * The disposer for disposing this context. This should be invoked in {@link - * #referenceCountReachedZero()}, which may be called once the ref count reaches zero in any + * #referenceCountReachedZero}, which may be called once the ref count reaches zero in any * thread. */ private final Consumer<RecordContext<K>> disposer; @@ -61,13 +63,18 @@ public class RecordContext<K> extends ReferenceCounted<RecordContext.DisposerRun */ private @Nullable volatile Object extra; - public RecordContext(Object record, K key, Consumer<RecordContext<K>> disposer, int keyGroup) { + /** The epoch of this context. */ + private final Epoch epoch; + + public RecordContext( + Object record, K key, Consumer<RecordContext<K>> disposer, int keyGroup, Epoch epoch) { super(0); this.record = record; this.key = key; this.keyOccupied = false; this.disposer = disposer; this.keyGroup = keyGroup; + this.epoch = epoch; } public Object getRecord() { @@ -112,6 +119,10 @@ public class RecordContext<K> extends ReferenceCounted<RecordContext.DisposerRun return extra; } + public Epoch getEpoch() { + return epoch; + } + @Override public int hashCode() { return Objects.hash(record, key); @@ -129,6 +140,12 @@ public class RecordContext<K> extends ReferenceCounted<RecordContext.DisposerRun if (!Objects.equals(record, that.record)) { return false; } + if (!Objects.equals(keyGroup, that.keyGroup)) { + return false; + } + if (!Objects.equals(epoch, that.epoch)) { + return false; + } return Objects.equals(key, that.key); } @@ -143,6 +160,8 @@ public class RecordContext<K> extends ReferenceCounted<RecordContext.DisposerRun + keyOccupied + ", ref=" + getReferenceCount() + + ", epoch=" + + epoch.id + "}"; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java index 09de8dfb117..c40edbf3940 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java @@ -24,6 +24,8 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.core.state.StateFutureImpl.AsyncFrameworkExceptionHandler; import org.apache.flink.core.state.StateFutureUtils; +import org.apache.flink.runtime.asyncprocessing.EpochManager.Epoch; +import org.apache.flink.runtime.asyncprocessing.EpochManager.ParallelMode; import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService; import org.apache.flink.runtime.mailbox.SyncMailboxExecutor; import org.apache.flink.runtime.state.AsyncKeyedStateBackend; @@ -588,6 +590,96 @@ class AsyncExecutionControllerTest { .isEqualTo("Caught exception when submitting StateFuture's callback."); } + @Test + void testEpochManager() { + setup( + 1000, + 10000, + 6000, + new SyncMailboxExecutor(), + new TestAsyncFrameworkExceptionHandler()); + AtomicInteger output = new AtomicInteger(0); + Runnable userCode = () -> valueState.asyncValue().thenAccept(v -> output.incrementAndGet()); + + String record1 = "key1-r1"; + String key1 = "key1"; + RecordContext<String> recordContext1 = aec.buildContext(record1, key1); + Epoch epoch1 = recordContext1.getEpoch(); + aec.setCurrentContext(recordContext1); + userCode.run(); + + String record2 = "key2-r2"; + String key2 = "key2"; + RecordContext<String> recordContext2 = aec.buildContext(record2, key2); + Epoch epoch2 = recordContext2.getEpoch(); + aec.setCurrentContext(recordContext2); + userCode.run(); + + assertThat(epoch1).isEqualTo(epoch2); + assertThat(epoch1.ongoingRecordCount).isEqualTo(2); + aec.processNonRecord(() -> output.incrementAndGet()); + + assertThat(output.get()).isEqualTo(3); + // SERIAL_BETWEEN_EPOCH mode would drain in-flight records on non-record arriving. + assertThat(epoch1.ongoingRecordCount).isEqualTo(0); + } + + @Test + void testMixEpochMode() { + // epoch1(parallel mode) -> epoch2(parallel mode) -> epoch3(serial mode), + // when epoch2 close, epoch1 is still in-flight. + // when epoch3 close, all in-flight records should drain, epoch1 and epoch2 should finish. + setup( + 1000, + 10000, + 6000, + new SyncMailboxExecutor(), + new TestAsyncFrameworkExceptionHandler()); + AtomicInteger output = new AtomicInteger(0); + Runnable userCode = () -> valueState.asyncValue().thenAccept(v -> output.incrementAndGet()); + + String record1 = "key1-r1"; + String key1 = "key1"; + RecordContext<String> recordContext1 = aec.buildContext(record1, key1); + Epoch epoch1 = recordContext1.getEpoch(); + aec.setCurrentContext(recordContext1); + userCode.run(); + + aec.epochManager.onNonRecord( + () -> output.incrementAndGet(), ParallelMode.PARALLEL_BETWEEN_EPOCH); + assertThat(epoch1.ongoingRecordCount).isEqualTo(1); + + String record2 = "key2-r2"; + String key2 = "key2"; + RecordContext<String> recordContext2 = aec.buildContext(record2, key2); + Epoch epoch2 = recordContext2.getEpoch(); + aec.setCurrentContext(recordContext2); + userCode.run(); + assertThat(epoch1.ongoingRecordCount).isEqualTo(1); + assertThat(epoch2.ongoingRecordCount).isEqualTo(1); + aec.epochManager.onNonRecord( + () -> output.incrementAndGet(), ParallelMode.PARALLEL_BETWEEN_EPOCH); + assertThat(epoch1.ongoingRecordCount).isEqualTo(1); + assertThat(epoch2.ongoingRecordCount).isEqualTo(1); + assertThat(output.get()).isEqualTo(0); + + String record3 = "key3-r3"; + String key3 = "key3"; + RecordContext<String> recordContext3 = aec.buildContext(record3, key3); + Epoch epoch3 = recordContext3.getEpoch(); + aec.setCurrentContext(recordContext3); + userCode.run(); + assertThat(epoch1.ongoingRecordCount).isEqualTo(1); + assertThat(epoch2.ongoingRecordCount).isEqualTo(1); + assertThat(epoch3.ongoingRecordCount).isEqualTo(1); + aec.epochManager.onNonRecord( + () -> output.incrementAndGet(), ParallelMode.SERIAL_BETWEEN_EPOCH); + assertThat(epoch1.ongoingRecordCount).isEqualTo(0); + assertThat(epoch2.ongoingRecordCount).isEqualTo(0); + assertThat(epoch3.ongoingRecordCount).isEqualTo(0); + assertThat(output.get()).isEqualTo(6); + } + /** Simulate the underlying state that is actually used to execute the request. */ static class TestUnderlyingState { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImplTest.java index 811fc95cae5..5d8f81c440e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImplTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.asyncprocessing; import org.apache.flink.core.state.StateFutureImpl.AsyncFrameworkExceptionHandler; import org.apache.flink.core.state.StateFutureUtils; +import org.apache.flink.runtime.asyncprocessing.EpochManager.Epoch; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.util.FlinkException; import org.apache.flink.util.FlinkRuntimeException; @@ -306,7 +307,7 @@ public class ContextStateFutureImplTest { private <K> RecordContext<K> buildRecordContext(Object record, K key) { int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(key, 128); - return new RecordContext<>(record, key, (e) -> {}, keyGroup); + return new RecordContext<>(record, key, (e) -> {}, keyGroup, new Epoch(0)); } /** A runner that performs single-step debugging. */ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/EpochManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/EpochManagerTest.java new file mode 100644 index 00000000000..361d543776e --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/EpochManagerTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing; + +import org.apache.flink.runtime.asyncprocessing.EpochManager.Epoch; +import org.apache.flink.runtime.asyncprocessing.EpochManager.EpochStatus; +import org.apache.flink.runtime.asyncprocessing.EpochManager.ParallelMode; + +import org.junit.jupiter.api.Test; + +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +/** Test for Epoch Manager. */ +class EpochManagerTest { + @Test + void testBasic() { + EpochManager epochManager = new EpochManager(null); + Epoch epoch1 = epochManager.onRecord(); + Epoch epoch2 = epochManager.onRecord(); + assertThat(epoch1).isEqualTo(epoch2); + assertThat(epoch1.ongoingRecordCount).isEqualTo(2); + AtomicInteger output = new AtomicInteger(0); + epochManager.onNonRecord( + () -> output.incrementAndGet(), ParallelMode.PARALLEL_BETWEEN_EPOCH); + // record3 is in a new epoch + Epoch epoch3 = epochManager.onRecord(); + assertThat(epoch3).isNotEqualTo(epoch1); + assertThat(epochManager.outputQueue.size()).isEqualTo(1); + assertThat(epochManager.outputQueue.peek().status).isEqualTo(EpochStatus.CLOSED); + // records in first epoch are not finished, so the non-record action is not executed + assertThat(output.get()).isEqualTo(0); + epochManager.completeOneRecord(epoch1); + epochManager.completeOneRecord(epoch2); + epochManager.completeOneRecord(epoch3); + // records in first epoch are finished, so the non-record action is executed + assertThat(output.get()).isEqualTo(1); + // records in second epoch are finished, but no newly arrived non-record, so the second + // epoch is still open + assertThat(epochManager.outputQueue.size()).isEqualTo(0); + assertThat(epochManager.activeEpoch.ongoingRecordCount).isEqualTo(0); + assertThat(epochManager.activeEpoch.status).isEqualTo(EpochStatus.OPEN); + } +} diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java index 7c6887a2a2d..3d31737beb0 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java @@ -26,6 +26,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.core.state.InternalStateFuture; +import org.apache.flink.runtime.asyncprocessing.EpochManager.Epoch; import org.apache.flink.runtime.asyncprocessing.RecordContext; import org.apache.flink.runtime.asyncprocessing.StateRequestHandler; import org.apache.flink.runtime.asyncprocessing.StateRequestType; @@ -87,7 +88,8 @@ public class ForStDBOperationTestBase { protected ContextKey<Integer> buildContextKey(int i) { int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(i, 128); - RecordContext<Integer> recordContext = new RecordContext<>(i, i, t -> {}, keyGroup); + RecordContext<Integer> recordContext = + new RecordContext<>(i, i, t -> {}, keyGroup, new Epoch(0)); return new ContextKey<>(recordContext); } diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java index 3e17f30d34b..8c597ec1167 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java @@ -18,6 +18,7 @@ package org.apache.flink.state.forst; +import org.apache.flink.runtime.asyncprocessing.EpochManager.Epoch; import org.apache.flink.runtime.asyncprocessing.RecordContext; import org.apache.flink.runtime.asyncprocessing.StateRequest; import org.apache.flink.runtime.asyncprocessing.StateRequestContainer; @@ -127,7 +128,8 @@ public class ForStStateExecutorTest extends ForStDBOperationTestBase { V value, R record) { int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(key, 128); - RecordContext<K> recordContext = new RecordContext<>(record, key, t -> {}, keyGroup); + RecordContext<K> recordContext = + new RecordContext<>(record, key, t -> {}, keyGroup, new Epoch(0)); TestStateFuture stateFuture = new TestStateFuture<>(); return new StateRequest<>(innerTable, requestType, value, stateFuture, recordContext); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java index 20441bcffa5..605186d772e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java @@ -93,7 +93,7 @@ public abstract class AbstractStreamOperatorV2<OUT> private final ExecutionConfig executionConfig; private final ClassLoader userCodeClassLoader; private final CloseableRegistry cancelables; - private final IndexedCombinedWatermarkStatus combinedWatermark; + protected final IndexedCombinedWatermarkStatus combinedWatermark; /** Metric group for the operator. */ protected final InternalOperatorMetricGroup metrics; @@ -493,7 +493,7 @@ public abstract class AbstractStreamOperatorV2<OUT> } } - public final void processWatermarkStatus(WatermarkStatus watermarkStatus, int inputId) + public void processWatermarkStatus(WatermarkStatus watermarkStatus, int inputId) throws Exception { boolean wasIdle = combinedWatermark.isIdle(); if (combinedWatermark.updateStatus(inputId - 1, watermarkStatus.isIdle())) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java index 6c1598c96c8..a87a9049583 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java @@ -39,8 +39,10 @@ import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures; import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer; import org.apache.flink.streaming.api.operators.Triggerable; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; import org.apache.flink.util.function.ThrowingConsumer; import org.apache.flink.util.function.ThrowingRunnable; @@ -259,6 +261,27 @@ public abstract class AbstractAsyncStateStreamOperator<OUT> extends AbstractStre return currentProcessingContext.getKey(); } + @Override + public void processWatermark(Watermark mark) throws Exception { + if (!isAsyncStateProcessingEnabled()) { + // If async state processing is disabled, fallback to the super class. + super.processWatermark(mark); + return; + } + asyncExecutionController.processNonRecord(() -> super.processWatermark(mark)); + } + + @Override + public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception { + if (!isAsyncStateProcessingEnabled()) { + // If async state processing is disabled, fallback to the super class. + super.processWatermarkStatus(watermarkStatus); + return; + } + asyncExecutionController.processNonRecord( + () -> super.processWatermarkStatus(watermarkStatus)); + } + @VisibleForTesting AsyncExecutionController<?> getAsyncExecutionController() { return asyncExecutionController; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java index 92a41eaeb06..a142a59ccac 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java @@ -37,7 +37,9 @@ import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures; import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer; import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; import org.apache.flink.util.function.ThrowingConsumer; import org.apache.flink.util.function.ThrowingRunnable; @@ -188,6 +190,35 @@ public abstract class AbstractAsyncStateStreamOperatorV2<OUT> extends AbstractSt (AsyncExecutionController<K>) asyncExecutionController); } + @Override + public void processWatermark(Watermark mark) throws Exception { + if (!isAsyncStateProcessingEnabled()) { + super.processWatermark(mark); + return; + } + asyncExecutionController.processNonRecord(() -> super.processWatermark(mark)); + } + + @Override + public void processWatermarkStatus(WatermarkStatus watermarkStatus, int inputId) + throws Exception { + if (!isAsyncStateProcessingEnabled()) { + super.processWatermarkStatus(watermarkStatus, inputId); + return; + } + asyncExecutionController.processNonRecord( + () -> { + boolean wasIdle = combinedWatermark.isIdle(); + if (combinedWatermark.updateStatus(inputId - 1, watermarkStatus.isIdle())) { + super.processWatermark( + new Watermark(combinedWatermark.getCombinedWatermark())); + } + if (wasIdle != combinedWatermark.isIdle()) { + output.emitWatermarkStatus(watermarkStatus); + } + }); + } + @VisibleForTesting AsyncExecutionController<?> getAsyncExecutionController() { return asyncExecutionController;