(flink) branch master updated: [FLINK-36243][state/forst] Store namespace in state request and contextKey (#25300)
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 25969c9de1e [FLINK-36243][state/forst] Store namespace in state request and contextKey (#25300) 25969c9de1e is described below commit 25969c9de1e73c2364200ae3d155aca4862ea036 Author: Yanfei Lei AuthorDate: Wed Sep 11 19:45:57 2024 +0800 [FLINK-36243][state/forst] Store namespace in state request and contextKey (#25300) --- .../asyncprocessing/AsyncExecutionController.java | 6 ++--- .../runtime/asyncprocessing/StateRequest.java | 12 +- .../asyncprocessing/StateRequestBuffer.java| 10 .../asyncprocessing/StateRequestContainer.java | 2 +- .../runtime/asyncprocessing/MockStateExecutor.java | 2 +- .../asyncprocessing/MockStateRequestContainer.java | 6 ++--- .../state/v2/InternalKeyedStateTestBase.java | 8 +++ .../org/apache/flink/state/forst/ContextKey.java | 14 ++- .../apache/flink/state/forst/ForStInnerTable.java | 4 ++-- .../apache/flink/state/forst/ForStListState.java | 15 .../apache/flink/state/forst/ForStMapState.java| 28 +++--- .../state/forst/ForStStateRequestClassifier.java | 4 ++-- .../apache/flink/state/forst/ForStValueState.java | 14 +++ .../state/forst/ForStDBOperationTestBase.java | 4 ++-- .../flink/state/forst/ForStStateExecutorTest.java | 20 15 files changed, 90 insertions(+), 59 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java index 24ae7e9342f..ad767aa340a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java @@ -239,7 +239,7 @@ public class AsyncExecutionController implements StateRequestHandler { @Nullable State state, StateRequestType type, @Nullable IN payload) { // Step 1: build state future & assign context. InternalStateFuture stateFuture = stateFutureFactory.create(currentContext); -StateRequest request = +StateRequest request = new StateRequest<>(state, type, payload, stateFuture, currentContext); // Step 2: try to seize the capacity, if the current in-flight records exceeds the limit, @@ -278,11 +278,11 @@ public class AsyncExecutionController implements StateRequestHandler { currentContext.setNamespace(state, namespace); } - void insertActiveBuffer(StateRequest request) { + void insertActiveBuffer(StateRequest request) { stateRequestsBuffer.enqueueToActive(request); } - void insertBlockingBuffer(StateRequest request) { + void insertBlockingBuffer(StateRequest request) { stateRequestsBuffer.enqueueToBlocking(request); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java index 87fd02926f2..32527a2e881 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.asyncprocessing; import org.apache.flink.api.common.state.v2.State; import org.apache.flink.core.state.InternalStateFuture; +import org.apache.flink.runtime.state.v2.InternalPartitionedState; import javax.annotation.Nullable; @@ -30,9 +31,10 @@ import java.io.Serializable; * * @param Type of partitioned key. * @param Type of input of this request. + * @param Type of namespace. * @param Type of value that request will return. */ -public class StateRequest implements Serializable { +public class StateRequest implements Serializable { /** * The underlying state to be accessed, can be empty for {@link StateRequestType#SYNC_POINT}. @@ -51,6 +53,8 @@ public class StateRequest implements Serializable { /** The record context of this request. */ private final RecordContext context; +@Nullable private final N namespace; + public StateRequest( @Nullable State state, StateRequestType type, @@ -62,6 +66,8 @@ public class StateRequest implements Serializable { this.payload = payload; this.stateFuture = stateFuture; this.context = context; +this.namespace = +state == null ? null : context.getNamespace((InternalPartitionedState) state); } public StateRequestType getRequestType()
(flink) branch master updated: [FLINK-34510][Runtime/State]Rename RestoreMode to RecoveryClaimMode (#25192)
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 7adeecd3445 [FLINK-34510][Runtime/State]Rename RestoreMode to RecoveryClaimMode (#25192) 7adeecd3445 is described below commit 7adeecd3445947f42d3e3d1e2961b9464e910236 Author: lz <971066...@qq.com> AuthorDate: Tue Sep 10 10:32:06 2024 +0800 [FLINK-34510][Runtime/State]Rename RestoreMode to RecoveryClaimMode (#25192) --- docs/static/generated/rest_v1_dispatcher.yml | 16 ++--- .../apache/flink/client/cli/CliFrontendParser.java | 16 ++--- .../apache/flink/client/cli/ProgramOptions.java| 6 +- .../flink/client/cli/CliFrontendRunTest.java | 18 ++--- .../StandaloneApplicationClusterEntryPoint.java| 12 ++-- .../flink/configuration/StateRecoveryOptions.java | 8 +-- .../{RestoreMode.java => RecoveryClaimMode.java} | 8 +-- .../KubernetesCheckpointRecoveryFactory.java | 6 +- .../flink/kubernetes/utils/KubernetesUtils.java| 8 +-- .../runtime/webmonitor/handlers/JarRunHandler.java | 19 +++--- .../webmonitor/handlers/JarRunRequestBody.java | 25 +++ .../handlers/JarRunHandlerParameterTest.java | 6 +- .../webmonitor/handlers/JarRunRequestBodyTest.java | 6 +- .../runtime/checkpoint/CheckpointCoordinator.java | 4 +- .../runtime/checkpoint/CheckpointProperties.java | 16 +++-- .../checkpoint/CheckpointRecoveryFactory.java | 6 +- .../runtime/checkpoint/CompletedCheckpoint.java| 8 +-- .../EmbeddedCompletedCheckpointStore.java | 8 +-- .../PerJobCheckpointRecoveryFactory.java | 10 +-- .../StandaloneCheckpointRecoveryFactory.java | 6 +- .../StandaloneCompletedCheckpointStore.java| 14 ++-- .../ZooKeeperCheckpointRecoveryFactory.java| 6 +- .../cleanup/CheckpointResourcesCleanupRunner.java | 6 +- .../EmbeddedHaServicesWithLeadershipControl.java | 4 +- .../runtime/jobgraph/SavepointConfigOptions.java | 8 +-- .../runtime/jobgraph/SavepointRestoreSettings.java | 41 +++- .../flink/runtime/minicluster/MiniCluster.java | 6 +- .../flink/runtime/scheduler/SchedulerUtils.java| 8 +-- .../flink/runtime/state/SharedStateRegistry.java | 13 ++-- .../runtime/state/SharedStateRegistryFactory.java | 6 +- .../runtime/state/SharedStateRegistryImpl.java | 6 +- .../apache/flink/runtime/state/StateBackend.java | 6 +- .../apache/flink/runtime/util/ZooKeeperUtils.java | 8 +-- .../flink/streaming/runtime/tasks/StreamTask.java | 4 +- .../CheckpointCoordinatorFailureTest.java | 4 +- .../CheckpointCoordinatorRestoringTest.java| 8 ++- .../checkpoint/CheckpointCoordinatorTest.java | 10 +-- .../CheckpointCoordinatorTriggeringTest.java | 8 +-- .../checkpoint/CompletedCheckpointTest.java| 8 ++- .../DefaultCompletedCheckpointStoreTest.java | 4 +- .../checkpoint/PerJobCheckpointRecoveryTest.java | 10 +-- .../TestingCheckpointRecoveryFactory.java | 4 +- .../ZooKeeperCompletedCheckpointStoreITCase.java | 4 +- .../ZooKeeperCompletedCheckpointStoreTest.java | 4 +- .../dispatcher/DispatcherCleanupITCase.java| 8 +-- .../CheckpointResourcesCleanupRunnerTest.java | 4 +- .../jobgraph/SavepointRestoreSettingsTest.java | 6 +- .../runtime/scheduler/SchedulerUtilsTest.java | 10 +-- .../runtime/state/SharedStateRegistryTest.java | 4 +- .../plan/nodes/exec/testutils/RestoreTestBase.java | 4 +- .../ChangelogRecoverySwitchStateBackendITCase.java | 4 +- .../ResumeCheckpointManuallyITCase.java| 77 -- .../test/checkpointing/SavepointFormatITCase.java | 4 +- .../flink/test/checkpointing/SavepointITCase.java | 11 ++-- .../SnapshotFileMergingCompatibilityITCase.java| 48 -- 55 files changed, 313 insertions(+), 279 deletions(-) diff --git a/docs/static/generated/rest_v1_dispatcher.yml b/docs/static/generated/rest_v1_dispatcher.yml index 1ea036fc53b..ccaa7c6bc24 100644 --- a/docs/static/generated/rest_v1_dispatcher.yml +++ b/docs/static/generated/rest_v1_dispatcher.yml @@ -2288,7 +2288,7 @@ components: allowNonRestoredState: type: boolean claimMode: - $ref: '#/components/schemas/RestoreMode' + $ref: '#/components/schemas/RecoveryClaimMode' entryClass: type: string flinkConfiguration: @@ -2307,7 +2307,7 @@ components: items: type: string restoreMode: - $ref: '#/components/schemas/RestoreMode' + $ref: '#/components/schemas/RecoveryClaimMode' savepointPath: type: string JarRunResponseBody: @@ -2773,6 +2773,12 @@ components: $ref: '#/components/schemas/Id' RawJs
(flink) branch master updated: [hotfix][checkpoint] Rename file-merging options in documents (#25048)
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new c15145b4a8c [hotfix][checkpoint] Rename file-merging options in documents (#25048) c15145b4a8c is described below commit c15145b4a8c99fbfd99c6cec0701db0efb1646a2 Author: Yanfei Lei AuthorDate: Wed Jul 10 10:00:24 2024 +0800 [hotfix][checkpoint] Rename file-merging options in documents (#25048) --- .../docs/dev/datastream/fault-tolerance/checkpointing.md | 10 +- .../docs/dev/datastream/fault-tolerance/checkpointing.md | 10 +- .../org/apache/flink/configuration/CheckpointingOptions.java | 7 --- .../api/environment/ExecutionCheckpointingOptions.java | 7 --- 4 files changed, 18 insertions(+), 16 deletions(-) diff --git a/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md b/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md index 9d76b5bec06..3d01e0ffddf 100644 --- a/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md +++ b/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md @@ -253,18 +253,18 @@ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironm ## 统一的 checkpoint 文件合并机制 (实验性功能) Flink 1.20 引入了 MVP 版本的统一 checkpoint 文件合并机制,该机制允许把分散的 checkpoint 小文件合并到大文件中,减少 checkpoint 文件创建删除的次数, -有助于减轻文件过多问题带来的文件系统元数据管理的压力。可以通过将 `state.checkpoints.file-merging.enabled` 设置为 `true` 来开启该机制。 -**注意**,考虑 trade-off,开启该机制会导致空间放大,即文件系统上的实际占用会比 state size 更大,可以通过设置 `state.checkpoints.file-merging.max-space-amplification` +有助于减轻文件过多问题带来的文件系统元数据管理的压力。可以通过将 `execution.checkpointing.file-merging.enabled` 设置为 `true` 来开启该机制。 +**注意**,考虑 trade-off,开启该机制会导致空间放大,即文件系统上的实际占用会比 state size 更大,可以通过设置 `execution.checkpointing.file-merging.max-space-amplification` 来控制文件放大的上限。 该机制适用于 Flink 中的 keyed state、operator state 和 channel state。对 shared scope state 提供 subtask 级别的合并;对 private scope state 提供 TaskManager 级别的合并,可以通过 - `state.checkpoints.file-merging.max-subtasks-per-file` 选项配置单个文件允许写入的最大 subtask 数目。 + `execution.checkpointing.file-merging.max-subtasks-per-file` 选项配置单个文件允许写入的最大 subtask 数目。 -统一文件合并机制也支持跨 checkpoint 的文件合并,通过设置 `state.checkpoints.file-merging.across-checkpoint-boundary` 为 `true` 开启。 +统一文件合并机制也支持跨 checkpoint 的文件合并,通过设置 `execution.checkpointing.file-merging.across-checkpoint-boundary` 为 `true` 开启。 该机制引入了文件池用于处理并发写的场景,文件池有两种模式,Non-blocking 模式的文件池会对每个文件请求即时返回一个物理文件,在频繁请求的情况下会创建出许多物理文件; -而 Blocking 模式的文件池会一直阻塞文件请求,直到文件池中有返回的文件可用,可以通过设置 `state.checkpoints.file-merging.pool-blocking` 为 `true` +而 Blocking 模式的文件池会一直阻塞文件请求,直到文件池中有返回的文件可用,可以通过设置 `execution.checkpointing.file-merging.pool-blocking` 为 `true` 选择 Blocking 模式,设置为 `false` 选择 Non-blocking 模式。 {{< top >}} diff --git a/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md b/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md index 5d410eb6780..6b3d625696d 100644 --- a/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md +++ b/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md @@ -297,21 +297,21 @@ to be completed. The unified file merging mechanism for checkpointing is introduced to Flink 1.20 as an MVP ("minimum viable product") feature, which allows scattered small checkpoint files to be written into larger files, reducing the number of file creations and file deletions, which alleviates the pressure of file system metadata management raised by the file flooding problem during checkpoints. -The mechanism can be enabled by setting `state.checkpoints.file-merging.enabled` to `true`. +The mechanism can be enabled by setting `execution.checkpointing.file-merging.enabled` to `true`. **Note** that as a trade-off, enabling this mechanism may lead to space amplification, that is, the actual occupation on the file system -will be larger than actual state size. `state.checkpoints.file-merging.max-space-amplification` +will be larger than actual state size. `execution.checkpointing.file-merging.max-space-amplification` can be used to limit the upper bound of space amplification. This mechanism is applicable to keyed state, operator state and channel state in Flink. Merging at subtask level is provided for shared scope state; Merging at TaskManager level is provided for private scope state. The maximum number of subtasks -allowed to be written to a single file can be configured through the `state.checkpoints.file-merging.max-subtasks-per-file` option. +allowed to be written to a single file can be configured through the `execution.checkpointing.file-merging.max-subtasks-per-file` option. This feature also supports merging files across checkpoints. To enable this, set -`state.checkpoints.file-merging.across-checkpoint-boundary` to `true`. +`execution.ch
(flink) branch release-1.20 updated: [hotfix][checkpoint] Rename file-merging options in documents (#25049)
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch release-1.20 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.20 by this push: new 0bdbfeb41be [hotfix][checkpoint] Rename file-merging options in documents (#25049) 0bdbfeb41be is described below commit 0bdbfeb41be53c8c4572937a8ee70f43ce9fd1ad Author: Yanfei Lei AuthorDate: Tue Jul 9 18:31:31 2024 +0800 [hotfix][checkpoint] Rename file-merging options in documents (#25049) --- .../docs/dev/datastream/fault-tolerance/checkpointing.md | 10 +- .../docs/dev/datastream/fault-tolerance/checkpointing.md | 10 +- .../org/apache/flink/configuration/CheckpointingOptions.java | 7 --- .../api/environment/ExecutionCheckpointingOptions.java | 7 --- 4 files changed, 18 insertions(+), 16 deletions(-) diff --git a/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md b/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md index 9d76b5bec06..3d01e0ffddf 100644 --- a/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md +++ b/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md @@ -253,18 +253,18 @@ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironm ## 统一的 checkpoint 文件合并机制 (实验性功能) Flink 1.20 引入了 MVP 版本的统一 checkpoint 文件合并机制,该机制允许把分散的 checkpoint 小文件合并到大文件中,减少 checkpoint 文件创建删除的次数, -有助于减轻文件过多问题带来的文件系统元数据管理的压力。可以通过将 `state.checkpoints.file-merging.enabled` 设置为 `true` 来开启该机制。 -**注意**,考虑 trade-off,开启该机制会导致空间放大,即文件系统上的实际占用会比 state size 更大,可以通过设置 `state.checkpoints.file-merging.max-space-amplification` +有助于减轻文件过多问题带来的文件系统元数据管理的压力。可以通过将 `execution.checkpointing.file-merging.enabled` 设置为 `true` 来开启该机制。 +**注意**,考虑 trade-off,开启该机制会导致空间放大,即文件系统上的实际占用会比 state size 更大,可以通过设置 `execution.checkpointing.file-merging.max-space-amplification` 来控制文件放大的上限。 该机制适用于 Flink 中的 keyed state、operator state 和 channel state。对 shared scope state 提供 subtask 级别的合并;对 private scope state 提供 TaskManager 级别的合并,可以通过 - `state.checkpoints.file-merging.max-subtasks-per-file` 选项配置单个文件允许写入的最大 subtask 数目。 + `execution.checkpointing.file-merging.max-subtasks-per-file` 选项配置单个文件允许写入的最大 subtask 数目。 -统一文件合并机制也支持跨 checkpoint 的文件合并,通过设置 `state.checkpoints.file-merging.across-checkpoint-boundary` 为 `true` 开启。 +统一文件合并机制也支持跨 checkpoint 的文件合并,通过设置 `execution.checkpointing.file-merging.across-checkpoint-boundary` 为 `true` 开启。 该机制引入了文件池用于处理并发写的场景,文件池有两种模式,Non-blocking 模式的文件池会对每个文件请求即时返回一个物理文件,在频繁请求的情况下会创建出许多物理文件; -而 Blocking 模式的文件池会一直阻塞文件请求,直到文件池中有返回的文件可用,可以通过设置 `state.checkpoints.file-merging.pool-blocking` 为 `true` +而 Blocking 模式的文件池会一直阻塞文件请求,直到文件池中有返回的文件可用,可以通过设置 `execution.checkpointing.file-merging.pool-blocking` 为 `true` 选择 Blocking 模式,设置为 `false` 选择 Non-blocking 模式。 {{< top >}} diff --git a/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md b/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md index 5d410eb6780..6b3d625696d 100644 --- a/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md +++ b/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md @@ -297,21 +297,21 @@ to be completed. The unified file merging mechanism for checkpointing is introduced to Flink 1.20 as an MVP ("minimum viable product") feature, which allows scattered small checkpoint files to be written into larger files, reducing the number of file creations and file deletions, which alleviates the pressure of file system metadata management raised by the file flooding problem during checkpoints. -The mechanism can be enabled by setting `state.checkpoints.file-merging.enabled` to `true`. +The mechanism can be enabled by setting `execution.checkpointing.file-merging.enabled` to `true`. **Note** that as a trade-off, enabling this mechanism may lead to space amplification, that is, the actual occupation on the file system -will be larger than actual state size. `state.checkpoints.file-merging.max-space-amplification` +will be larger than actual state size. `execution.checkpointing.file-merging.max-space-amplification` can be used to limit the upper bound of space amplification. This mechanism is applicable to keyed state, operator state and channel state in Flink. Merging at subtask level is provided for shared scope state; Merging at TaskManager level is provided for private scope state. The maximum number of subtasks -allowed to be written to a single file can be configured through the `state.checkpoints.file-merging.max-subtasks-per-file` option. +allowed to be written to a single file can be configured through the `execution.checkpointing.file-merging.max-subtasks-per-file` option. This feature also supports merging files across checkpoints. To enable this, set -`state.checkpoints.file-merging.across-checkpoint-bo
(flink) branch release-1.18 updated: [BP-1.18] [FLINK-33192][runtime] Fix Memory Leak in WindowOperator due to Improper Timer Cleanup (#25032)
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.18 by this push: new 87b36233512 [BP-1.18] [FLINK-33192][runtime] Fix Memory Leak in WindowOperator due to Improper Timer Cleanup (#25032) 87b36233512 is described below commit 87b362335128e62112cf34839837b2d635d93df2 Author: Kartikey Pant AuthorDate: Mon Jul 8 07:41:47 2024 +0530 [BP-1.18] [FLINK-33192][runtime] Fix Memory Leak in WindowOperator due to Improper Timer Cleanup (#25032) * [FLINK-33192][runtime] Fix Memory Leak in WindowOperator due to Improper Timer Cleanup * Fix one of the added WindowOperatorTest methods to be public --- .../operators/windowing/WindowOperator.java| 10 +- .../operators/windowing/WindowOperatorTest.java| 151 + 2 files changed, 155 insertions(+), 6 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index a45b5b68e72..a660a4c2ea6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -372,10 +372,9 @@ public class WindowOperator if (triggerResult.isFire()) { ACC contents = windowState.get(); -if (contents == null) { -continue; +if (contents != null) { +emitWindowContents(actualWindow, contents); } -emitWindowContents(actualWindow, contents); } if (triggerResult.isPurge()) { @@ -405,10 +404,9 @@ public class WindowOperator if (triggerResult.isFire()) { ACC contents = windowState.get(); -if (contents == null) { -continue; +if (contents != null) { +emitWindowContents(window, contents); } -emitWindowContents(window, contents); } if (triggerResult.isPurge()) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java index bc5ef7f0b05..dda2728ab44 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java @@ -84,6 +84,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -3069,6 +3070,114 @@ public class WindowOperatorTest extends TestLogger { testHarness.close(); } +@Test +public void testCleanupTimerWithEmptyStateNoResultForTumblingWindows() throws Exception { +final int windowSize = 2; +final long lateness = 1; + +ListStateDescriptor> windowStateDesc = +new ListStateDescriptor<>( +"window-contents", +STRING_INT_TUPLE.createSerializer(new ExecutionConfig())); + +WindowOperator< +String, +Tuple2, +Iterable>, +Tuple2, +TimeWindow> +operator = +new WindowOperator<>( + TumblingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)), +new TimeWindow.Serializer(), +new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer( +new ExecutionConfig()), +windowStateDesc, +new InternalIterableWindowFunction<>(new EmptyReturnFunction()), +new FireEverytimeOnElementAndEventTimeTrigger(), +lateness, +null /* late data output tag */); + +OneInputStreamOperatorTestHarness, Tuple2> +
(flink) branch master updated: [FLINK-33192][runtime] Fix Memory Leak in WindowOperator due to Improper Timer Cleanup (#24917)
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new e78524a8780 [FLINK-33192][runtime] Fix Memory Leak in WindowOperator due to Improper Timer Cleanup (#24917) e78524a8780 is described below commit e78524a87808913bfcd6a84847af46501a9a7266 Author: Kartikey Pant AuthorDate: Sat Jul 6 10:04:43 2024 +0530 [FLINK-33192][runtime] Fix Memory Leak in WindowOperator due to Improper Timer Cleanup (#24917) --- .../operators/windowing/WindowOperator.java| 10 +- .../operators/windowing/WindowOperatorTest.java| 150 + 2 files changed, 154 insertions(+), 6 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index 5c3929f79dc..e67800f712d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -376,10 +376,9 @@ public class WindowOperator if (triggerResult.isFire()) { ACC contents = windowState.get(); -if (contents == null) { -continue; +if (contents != null) { +emitWindowContents(actualWindow, contents); } -emitWindowContents(actualWindow, contents); } if (triggerResult.isPurge()) { @@ -409,10 +408,9 @@ public class WindowOperator if (triggerResult.isFire()) { ACC contents = windowState.get(); -if (contents == null) { -continue; +if (contents != null) { +emitWindowContents(window, contents); } -emitWindowContents(window, contents); } if (triggerResult.isPurge()) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java index 9950dd89b95..48a09ec4fa2 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java @@ -3130,6 +3130,114 @@ class WindowOperatorTest { testHarness.close(); } +@Test +void testCleanupTimerWithEmptyStateNoResultForTumblingWindows() throws Exception { +final int windowSize = 2; +final long lateness = 1; + +ListStateDescriptor> windowStateDesc = +new ListStateDescriptor<>( +"window-contents", +STRING_INT_TUPLE.createSerializer(new SerializerConfigImpl())); + +WindowOperator< +String, +Tuple2, +Iterable>, +Tuple2, +TimeWindow> +operator = +new WindowOperator<>( + TumblingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)), +new TimeWindow.Serializer(), +new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer( +new SerializerConfigImpl()), +windowStateDesc, +new InternalIterableWindowFunction<>(new EmptyReturnFunction()), +new FireEverytimeOnElementAndEventTimeTrigger(), +lateness, +null /* late data output tag */); + +OneInputStreamOperatorTestHarness, Tuple2> +testHarness = createTestHarness(operator); + +testHarness.open(); + +ConcurrentLinkedQueue expected = new ConcurrentLinkedQueue<>(); +// normal element +testHarness.processElement(new StreamRecord<>(new Tuple2<>("test_key", 1), 1000)); +assertThat( +operator.processContext +.windowState() +.getListState(windowStateDesc) +.get() +
(flink) branch master updated: [FLINK-32091][checkpoint] Add file size metrics for file-merging (#24922)
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 54978689f68 [FLINK-32091][checkpoint] Add file size metrics for file-merging (#24922) 54978689f68 is described below commit 54978689f68ffa03e7b89292f6a7aa94a2406116 Author: Yanfei Lei AuthorDate: Fri Jun 14 18:38:42 2024 +0800 [FLINK-32091][checkpoint] Add file size metrics for file-merging (#24922) --- docs/content.zh/docs/ops/metrics.md| 21 + docs/content/docs/ops/metrics.md | 21 + ...AcrossCheckpointFileMergingSnapshotManager.java | 6 +- .../filemerging/FileMergingMetricGroup.java| 54 +++ .../FileMergingSnapshotManagerBase.java| 8 +- .../FileMergingSnapshotManagerBuilder.java | 21 - ...WithinCheckpointFileMergingSnapshotManager.java | 6 +- .../state/TaskExecutorFileMergingManager.java | 5 +- .../flink/runtime/taskexecutor/TaskExecutor.java | 3 +- .../filemerging/FileMergingMetricsTest.java| 103 + .../FileMergingSnapshotManagerTestBase.java| 4 + .../state/TaskExecutorFileMergingManagerTest.java | 25 - 12 files changed, 264 insertions(+), 13 deletions(-) diff --git a/docs/content.zh/docs/ops/metrics.md b/docs/content.zh/docs/ops/metrics.md index f0778bdc292..e461044dddf 100644 --- a/docs/content.zh/docs/ops/metrics.md +++ b/docs/content.zh/docs/ops/metrics.md @@ -1358,6 +1358,27 @@ Note that for failed checkpoints, metrics are updated on a best efforts basis an The time in nanoseconds that elapsed between the creation of the last checkpoint and the time when the checkpointing process has started by this Task. This delay shows how long it takes for the first checkpoint barrier to reach the task. A high value indicates back-pressure. If only a specific task has a long start delay, the most likely reason is data skew. Gauge + + Job (only available on TaskManager) + fileMerging.logicalFileCount + The number of logical files of file merging mechanism. + Gauge + + + fileMerging.logicalFileSize + The total size of logical files of file merging mechanism on one task manager for one job. + Gauge + + + fileMerging.physicalFileCount + The number of physical files of file merging mechanism. + Gauge + + + fileMerging.physicalFileSize + The total size of physical files of file merging mechanism on one task manager for one job, usually larger than fileMerging.logicalFileSize. + Gauge + diff --git a/docs/content/docs/ops/metrics.md b/docs/content/docs/ops/metrics.md index f65d3cf74e4..5ab5cb21180 100644 --- a/docs/content/docs/ops/metrics.md +++ b/docs/content/docs/ops/metrics.md @@ -1348,6 +1348,27 @@ Note that for failed checkpoints, metrics are updated on a best efforts basis an The time in nanoseconds that elapsed between the creation of the last checkpoint and the time when the checkpointing process has started by this Task. This delay shows how long it takes for the first checkpoint barrier to reach the task. A high value indicates back-pressure. If only a specific task has a long start delay, the most likely reason is data skew. Gauge + + Job (only available on TaskManager) + fileMerging.logicalFileCount + The number of logical files of file merging mechanism. + Gauge + + + fileMerging.logicalFileSize + The total size of logical files of file merging mechanism on one task manager for one job. + Gauge + + + fileMerging.physicalFileCount + The number of physical files of file merging mechanism. + Gauge + + + fileMerging.physicalFileSize + The total size of physical files of file merging mechanism on one task manager for one job, usually larger than fileMerging.logicalFileSize. + Gauge + diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManager.java index be0aeed9a58..6bebd1586f3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManager.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.checkpoint.filemerging; import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.state.CheckpointedStateScope; import javax.annotation.Nonnull; @@ -35,8 +36,9 @@ public
(flink) branch master updated (9fbe9f30480 -> 1e996b8731b)
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 9fbe9f30480 [FLINK-35460][state] Adjust read size when ByteBuffer size is larger than file size for ForSt (#24847) add 1e996b8731b [FLINK-35457][checkpoint] Hotfix! close physical file under the protection of lock (#24846) No new revisions were added by this update. Summary of changes: .../FileMergingSnapshotManagerBase.java| 61 -- .../checkpoint/filemerging/PhysicalFile.java | 13 +++-- 2 files changed, 41 insertions(+), 33 deletions(-)
(flink) branch master updated: [FLINK-35382][test] Disable snapshot-file-merging in ChangelogCompatibilityITCase (#24813)
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 26b149a1664 [FLINK-35382][test] Disable snapshot-file-merging in ChangelogCompatibilityITCase (#24813) 26b149a1664 is described below commit 26b149a1664ca8ea395badbb02e1e245a623ee90 Author: Jinzhong Li AuthorDate: Tue May 21 10:30:53 2024 +0800 [FLINK-35382][test] Disable snapshot-file-merging in ChangelogCompatibilityITCase (#24813) --- .../apache/flink/test/state/ChangelogCompatibilityITCase.java | 11 +++ 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java index 3844194821a..675d26bb3c6 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java @@ -137,9 +137,9 @@ public class ChangelogCompatibilityITCase { private StreamExecutionEnvironment initEnvironment() { Configuration conf = new Configuration(); -conf.set( -FILE_MERGING_ENABLED, false); // TODO: remove file merging setting after FLINK-32085 -StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +// TODO:remove file-merging setting after FLINK-32085 & FLINK-32081 are resolved. +conf.set(FILE_MERGING_ENABLED, false); +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.enableChangelogStateBackend(testCase.startWithChangelog); if (testCase.restoreSource == RestoreSource.CHECKPOINT) { env.enableCheckpointing(50); @@ -182,7 +182,10 @@ public class ChangelogCompatibilityITCase { } private void restoreAndValidate(String location) { -StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +Configuration conf = new Configuration(); +// TODO:remove file-merging setting after FLINK-32085 & FLINK-32081 are resolved. +conf.set(FILE_MERGING_ENABLED, false); +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.enableChangelogStateBackend(testCase.restoreWithChangelog); JobGraph jobGraph = addGraph(env); jobGraph.setSavepointRestoreSettings(forPath(location));
(flink) branch master updated: [FLINK-35030][runtime] Introduce Epoch Manager for under async execution (#24748)
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new f1ecb9e4701 [FLINK-35030][runtime] Introduce Epoch Manager for under async execution (#24748) f1ecb9e4701 is described below commit f1ecb9e4701d612050da54589a8f561857debf34 Author: Yanfei Lei AuthorDate: Fri May 17 13:24:38 2024 +0800 [FLINK-35030][runtime] Introduce Epoch Manager for under async execution (#24748) --- .../asyncprocessing/AsyncExecutionController.java | 41 +++- .../runtime/asyncprocessing/EpochManager.java | 210 + .../runtime/asyncprocessing/RecordContext.java | 23 ++- .../AsyncExecutionControllerTest.java | 92 + .../ContextStateFutureImplTest.java| 3 +- .../runtime/asyncprocessing/EpochManagerTest.java | 61 ++ .../state/forst/ForStDBOperationTestBase.java | 4 +- .../flink/state/forst/ForStStateExecutorTest.java | 4 +- .../api/operators/AbstractStreamOperatorV2.java| 4 +- .../AbstractAsyncStateStreamOperator.java | 23 +++ .../AbstractAsyncStateStreamOperatorV2.java| 31 +++ 11 files changed, 484 insertions(+), 12 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java index f33794e16c9..693ad8753f1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.api.common.state.v2.State; import org.apache.flink.core.state.InternalStateFuture; import org.apache.flink.core.state.StateFutureImpl.AsyncFrameworkExceptionHandler; +import org.apache.flink.runtime.asyncprocessing.EpochManager.ParallelMode; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.ThrowingRunnable; @@ -76,6 +77,9 @@ public class AsyncExecutionController implements StateRequestHandler { */ private final MailboxExecutor mailboxExecutor; +/** Exception handler to handle the exception thrown by asynchronous framework. */ +private final AsyncFrameworkExceptionHandler exceptionHandler; + /** The key accounting unit which is used to detect the key conflict. */ final KeyAccountingUnit keyAccountingUnit; @@ -86,7 +90,7 @@ public class AsyncExecutionController implements StateRequestHandler { private final StateFutureFactory stateFutureFactory; /** The state executor where the {@link StateRequest} is actually executed. */ -StateExecutor stateExecutor; +final StateExecutor stateExecutor; /** The corresponding context that currently runs in task thread. */ RecordContext currentContext; @@ -102,6 +106,15 @@ public class AsyncExecutionController implements StateRequestHandler { /** Max parallelism of the job. */ private final int maxParallelism; +/** The reference of epoch manager. */ +final EpochManager epochManager; + +/** + * The parallel mode of epoch execution. Keep this field internal for now, until we could see + * the concrete need for {@link ParallelMode#PARALLEL_BETWEEN_EPOCH} from average users. + */ +final ParallelMode epochParallelMode = ParallelMode.SERIAL_BETWEEN_EPOCH; + public AsyncExecutionController( MailboxExecutor mailboxExecutor, AsyncFrameworkExceptionHandler exceptionHandler, @@ -112,6 +125,7 @@ public class AsyncExecutionController implements StateRequestHandler { int maxInFlightRecords) { this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords); this.mailboxExecutor = mailboxExecutor; +this.exceptionHandler = exceptionHandler; this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor, exceptionHandler); this.stateExecutor = stateExecutor; this.batchSize = batchSize; @@ -131,11 +145,13 @@ public class AsyncExecutionController implements StateRequestHandler { }, "AEC-buffer-timeout")); +this.epochManager = new EpochManager(this); LOG.info( -"Create AsyncExecutionController: batchSize {}, bufferTimeout {}, maxInFlightRecordNum {}", +"Create AsyncExecutionController: batchSize {}, bufferTimeout {}, maxInFlightRecordNum {}, epochParallelMode {}", this.batc
(flink) branch master updated: [FLINK-32082][docs] Documentation of checkpoint file-merging (#24766)
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new a8cf2ba1224 [FLINK-32082][docs] Documentation of checkpoint file-merging (#24766) a8cf2ba1224 is described below commit a8cf2ba1224b0729eedc8712cfe60c30c590fc11 Author: Yanfei Lei AuthorDate: Thu May 16 15:42:38 2024 +0800 [FLINK-32082][docs] Documentation of checkpoint file-merging (#24766) --- .../datastream/fault-tolerance/checkpointing.md| 17 + .../datastream/fault-tolerance/checkpointing.md| 22 ++ 2 files changed, 39 insertions(+) diff --git a/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md b/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md index b8148aaae10..9acbdd4c324 100644 --- a/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md +++ b/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md @@ -250,5 +250,22 @@ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironm 需要注意的是,这一行为可能会延长任务运行的时间,如果 checkpoint 周期比较大,这一延迟会非常明显。 极端情况下,如果 checkpoint 的周期被设置为 `Long.MAX_VALUE`,那么任务永远不会结束,因为下一次 checkpoint 不会进行。 +## 统一的 checkpoint 文件合并机制 (实验性功能) + +Flink 1.20 引入了 MVP 版本的统一 checkpoint 文件合并机制,该机制允许把分散的 checkpoint 小文件合并到大文件中,减少 checkpoint 文件创建删除的次数, +有助于减轻文件过多问题带来的文件系统元数据管理的压力。可以通过将 `state.checkpoints.file-merging.enabled` 设置为 `true` 来开启该机制。 +**注意**,考虑 trade-off,开启该机制会导致空间放大,即文件系统上的实际占用会比 state size 更大,可以通过设置 `state.checkpoints.file-merging.max-space-amplification` +来控制文件放大的上限。 + +该机制适用于 Flink 中的 keyed state、operator state 和 channel state。对 shared scope state +提供 subtask 级别的合并;对 private scope state 提供 TaskManager 级别的合并,可以通过 + `state.checkpoints.file-merging.max-subtasks-per-file` 选项配置单个文件允许写入的最大 subtask 数目。 + +统一文件合并机制也支持跨 checkpoint 的文件合并,通过设置 `state.checkpoints.file-merging.across-checkpoint-boundary` 为 `true` 开启。 + +该机制引入了文件池用于处理并发写的场景,文件池有两种模式,Non-blocking 模式的文件池会对每个文件请求即时返回一个物理文件,在频繁请求的情况下会创建出许多物理文件; +而 Blocking 模式的文件池会一直阻塞文件请求,直到文件池中有返回的文件可用,可以通过设置 `state.checkpoints.file-merging.pool-blocking` 为 `true` +选择 Blocking 模式,设置为 `false` 选择 Non-blocking 模式。 + {{< top >}} diff --git a/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md b/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md index 5dc9e8499f6..b8eb5888fa8 100644 --- a/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md +++ b/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md @@ -292,4 +292,26 @@ The final checkpoint would be triggered immediately after all operators have rea without waiting for periodic triggering, but the job will need to wait for this final checkpoint to be completed. +## Unify file merging mechanism for checkpoints (Experimental) + +The unified file merging mechanism for checkpointing is introduced to Flink 1.20 as an MVP ("minimum viable product") feature, +which allows scattered small checkpoint files to be written into larger files, reducing the number of file creations +and file deletions, which alleviates the pressure of file system metadata management raised by the file flooding problem during checkpoints. +The mechanism can be enabled by setting `state.checkpoints.file-merging.enabled` to `true`. +**Note** that as a trade-off, enabling this mechanism may lead to space amplification, that is, the actual occupation on the file system +will be larger than actual state size. `state.checkpoints.file-merging.max-space-amplification` +can be used to limit the upper bound of space amplification. + +This mechanism is applicable to keyed state, operator state and channel state in Flink. Merging at subtask level is +provided for shared scope state; Merging at TaskManager level is provided for private scope state. The maximum number of subtasks +allowed to be written to a single file can be configured through the `state.checkpoints.file-merging.max-subtasks-per-file` option. + +This feature also supports merging files across checkpoints. To enable this, set +`state.checkpoints.file-merging.across-checkpoint-boundary` to `true`. + +This mechanism introduces a file pool to handle concurrent writing scenarios. There are two modes, the non-blocking mode will +always provide usable physical file without blocking when receive a file request, it may create many physical files if poll +file frequently; while the blocking mode will be blocked until there are returned files available in the file pool. This can be configured via +setting `state.checkpoints.file-merging.pool-blocking` as `true` for blocking or `false` for non-blocking. + {{< top >}}
(flink) branch master updated: [FLINK-32092][tests] Integrate snapshot file-merging with existing IT cases (#24789)
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new b87ead743dc [FLINK-32092][tests] Integrate snapshot file-merging with existing IT cases (#24789) b87ead743dc is described below commit b87ead743dca161cdae8a1fef761954d206b81fb Author: Yanfei Lei AuthorDate: Thu May 16 10:12:12 2024 +0800 [FLINK-32092][tests] Integrate snapshot file-merging with existing IT cases (#24789) --- .../flink/runtime/state/OperatorStateRestoreOperation.java | 4 +++- .../runtime/state/filemerging/SegmentFileStateHandle.java | 3 ++- .../org/apache/flink/streaming/runtime/tasks/StreamTask.java | 10 ++ .../org/apache/flink/streaming/util/TestStreamEnvironment.java | 7 ++- .../flink/test/checkpointing/ChangelogRecoveryITCaseBase.java | 6 +- .../flink/test/checkpointing/RestoreUpgradedJobITCase.java | 7 ++- .../apache/flink/test/state/ChangelogCompatibilityITCase.java | 4 .../flink/test/state/ChangelogRecoveryCachingITCase.java | 3 +++ .../org/apache/flink/test/state/ChangelogRescalingITCase.java | 3 +++ 9 files changed, 38 insertions(+), 9 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java index 33ec3d92ab8..e7ba144d5b7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java @@ -24,6 +24,7 @@ import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.runtime.state.filemerging.EmptyFileMergingOperatorStreamStateHandle; import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; import org.apache.flink.util.Preconditions; @@ -66,7 +67,8 @@ public class OperatorStateRestoreOperation implements RestoreOperation { for (OperatorStateHandle stateHandle : stateHandles) { -if (stateHandle == null) { +if (stateHandle == null +|| stateHandle instanceof EmptyFileMergingOperatorStreamStateHandle) { continue; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/SegmentFileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/SegmentFileStateHandle.java index 11d68e7b394..a4b31cc310e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/SegmentFileStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/SegmentFileStateHandle.java @@ -106,7 +106,8 @@ public class SegmentFileStateHandle implements StreamStateHandle { @Override public PhysicalStateHandleID getStreamStateHandleID() { -return new PhysicalStateHandleID(filePath.toUri().toString()); +return new PhysicalStateHandleID( +String.format("%s-%d-%d", filePath.toUri(), startPos, stateSize)); } public long getStartPos() { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index e0ffe9b9267..433edf63f25 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -562,10 +562,12 @@ public abstract class StreamTask> return checkpointStorageAccess; } try { -CheckpointStorageWorkerView mergingCheckpointStorageAccess = -checkpointStorageAccess.toFileMergingStorage( -fileMergingSnapshotManager, environment); -return (CheckpointStorageAccess) mergingCheckpointStorageAccess; +CheckpointStorageAccess mergingCheckpointStorageAccess = +(CheckpointStorageAccess) +checkpointStorageAccess.toFileMergingStorage( +fileMergingSnapshotManager, environment); + mergingCheckpointStorageAccess.initializeBaseLocationsForCheckpoint(); +return mergingCheckpointStorageAccess; } catch (IOException e) { LOG.warn( "Initiating FsMergingCheckpointStorageAccess failed " diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnv
(flink) branch master updated: [FLINK-32087][checkpoint] Introduce space amplification statistics of file merging (#24762)
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 9a5a99b1a30 [FLINK-32087][checkpoint] Introduce space amplification statistics of file merging (#24762) 9a5a99b1a30 is described below commit 9a5a99b1a30054268bbde36d565cbb1b81018890 Author: Yanfei Lei AuthorDate: Tue May 14 16:21:03 2024 +0800 [FLINK-32087][checkpoint] Introduce space amplification statistics of file merging (#24762) --- .../filemerging/FileMergingSnapshotManager.java| 75 ++ .../FileMergingSnapshotManagerBase.java| 49 ++ .../checkpoint/filemerging/LogicalFile.java| 1 + .../checkpoint/filemerging/PhysicalFile.java | 32 +++-- ...ssCheckpointFileMergingSnapshotManagerTest.java | 30 - .../FileMergingSnapshotManagerTestBase.java| 47 ++ ...inCheckpointFileMergingSnapshotManagerTest.java | 30 - ...FileMergingCheckpointStateOutputStreamTest.java | 5 +- 8 files changed, 247 insertions(+), 22 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java index add8806369c..f3523c4430f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java @@ -34,6 +34,7 @@ import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageAccess; import java.io.Closeable; import java.util.Collection; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Stream; /** @@ -268,4 +269,78 @@ public interface FileMergingSnapshotManager extends Closeable { "%s-%s(%d/%d)", jobIDString, operatorIDString, subtaskIndex, parallelism); } } + +/** Space usage statistics of a managed directory. */ +final class SpaceStat { + +AtomicLong physicalFileCount; +AtomicLong physicalFileSize; + +AtomicLong logicalFileCount; +AtomicLong logicalFileSize; + +public SpaceStat() { +this(0, 0, 0, 0); +} + +public SpaceStat( +long physicalFileCount, +long physicalFileSize, +long logicalFileCount, +long logicalFileSize) { +this.physicalFileCount = new AtomicLong(physicalFileCount); +this.physicalFileSize = new AtomicLong(physicalFileSize); +this.logicalFileCount = new AtomicLong(logicalFileCount); +this.logicalFileSize = new AtomicLong(logicalFileSize); +} + +public void onLogicalFileCreate(long size) { +physicalFileSize.addAndGet(size); +logicalFileSize.addAndGet(size); +logicalFileCount.incrementAndGet(); +} + +public void onLogicalFileDelete(long size) { +logicalFileSize.addAndGet(-size); +logicalFileCount.decrementAndGet(); +} + +public void onPhysicalFileCreate() { +physicalFileCount.incrementAndGet(); +} + +public void onPhysicalFileDelete(long size) { +physicalFileSize.addAndGet(-size); +physicalFileCount.decrementAndGet(); +} + +@Override +public boolean equals(Object o) { +if (this == o) { +return true; +} +if (o == null || getClass() != o.getClass()) { +return false; +} +SpaceStat spaceStat = (SpaceStat) o; +return physicalFileCount.get() == spaceStat.physicalFileCount.get() +&& physicalFileSize.get() == spaceStat.physicalFileSize.get() +&& logicalFileCount.get() == spaceStat.logicalFileCount.get() +&& logicalFileSize.get() == spaceStat.logicalFileSize.get(); +} + +@Override +public String toString() { +return "SpaceStat{" ++ "physicalFileCount=" ++ physicalFileCount.get() ++ ", physicalFileSize=" ++ physicalFileSize.get() ++ ", logicalFileCount=" ++ logicalFileCount.get() ++ ", logicalFileSize=" ++ logicalFileSize.get() ++ '}'; +} +} } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java b/flink-runtime/src/main/java/org/apa
(flink) branch master updated (547e4b53ebe -> 4fe66e06974)
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 547e4b53ebe [FLINK-35270]Enrich information in logs, making it easier for debugging add 4fe66e06974 [FLINK-32084][checkpoint] Migrate current file merging of channel state snapshot into the unify file merging framework No new revisions were added by this update. Summary of changes: .../ChannelStateWriteRequestDispatcherImpl.java| 20 +- .../ChannelStateWriteRequestExecutorFactory.java | 24 +--- .../checkpoint/channel/ChannelStateWriterImpl.java | 13 --- .../filesystem/FsCheckpointStorageAccess.java | 1 - .../FsMergingCheckpointStorageAccess.java | 6 ++- ...ChannelStateWriteRequestDispatcherImplTest.java | 12 ++ .../ChannelStateWriteRequestDispatcherTest.java| 3 +- ...hannelStateWriteRequestExecutorFactoryTest.java | 11 -- .../ChannelStateWriteRequestExecutorImplTest.java | 3 +- .../channel/ChannelStateWriterImplTest.java| 12 -- .../operators/testutils/DummyEnvironment.java | 14 +++ .../runtime/state/ChannelPersistenceITCase.java| 4 +- .../environment/ExecutionCheckpointingOptions.java | 9 + .../flink/streaming/runtime/tasks/StreamTask.java | 43 ++ .../tasks/SubtaskCheckpointCoordinatorImpl.java| 43 +++--- .../MockSubtaskCheckpointCoordinatorBuilder.java | 27 -- .../runtime/tasks/StreamMockEnvironment.java | 11 ++ .../tasks/StreamTaskMailboxTestHarnessBuilder.java | 4 ++ .../tasks/SubtaskCheckpointCoordinatorTest.java| 10 - 19 files changed, 166 insertions(+), 104 deletions(-)
(flink) branch master updated: [FLINK-35158][runtime] Error handling in StateFuture's callback
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new bb0f4429598 [FLINK-35158][runtime] Error handling in StateFuture's callback bb0f4429598 is described below commit bb0f4429598db703e69d951986291862cbd5416b Author: fredia AuthorDate: Mon Apr 22 22:24:29 2024 +0800 [FLINK-35158][runtime] Error handling in StateFuture's callback --- .../flink/api/common/state/v2/StateFuture.java | 18 +- .../flink/util/function/FunctionWithException.java | 22 ++ .../flink/util/function/ThrowingConsumer.java | 19 ++ .../flink/core/state/CompletedStateFuture.java | 39 ++- .../flink/core/state/InternalStateFuture.java | 5 +- .../apache/flink/core/state/StateFutureImpl.java | 262 + .../apache/flink/core/state/StateFutureTest.java | 43 +++- .../asyncprocessing/AsyncExecutionController.java | 16 +- .../asyncprocessing/AsyncStateException.java | 34 ++- .../asyncprocessing/ContextStateFutureImpl.java| 19 +- .../asyncprocessing/StateFutureFactory.java| 10 +- .../AsyncExecutionControllerTest.java | 161 - .../ContextStateFutureImplTest.java| 129 -- .../state/forst/ForStDBOperationTestBase.java | 39 +-- .../AbstractAsyncStateStreamOperator.java | 10 +- .../AbstractAsyncStateStreamOperatorV2.java| 15 +- .../InternalTimerServiceAsyncImplTest.java | 17 +- .../AbstractAsyncStateStreamOperatorTest.java | 8 +- .../AbstractAsyncStateStreamOperatorV2Test.java| 11 +- 19 files changed, 640 insertions(+), 237 deletions(-) diff --git a/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/StateFuture.java b/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/StateFuture.java index 1f7b2f0d569..3c7faf6b928 100644 --- a/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/StateFuture.java +++ b/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/StateFuture.java @@ -19,10 +19,9 @@ package org.apache.flink.api.common.state.v2; import org.apache.flink.annotation.Experimental; - -import java.util.function.BiFunction; -import java.util.function.Consumer; -import java.util.function.Function; +import org.apache.flink.util.function.BiFunctionWithException; +import org.apache.flink.util.function.FunctionWithException; +import org.apache.flink.util.function.ThrowingConsumer; /** * StateFuture is a future that act as a return value for async state interfaces. Note: All these @@ -40,7 +39,8 @@ public interface StateFuture { * @param the function's return type. * @return the new StateFuture. */ - StateFuture thenApply(Function fn); + StateFuture thenApply( +FunctionWithException fn); /** * Returns a new StateFuture that, when this future completes normally, is executed with this @@ -49,7 +49,7 @@ public interface StateFuture { * @param action the action to perform before completing the returned StateFuture. * @return the new StateFuture. */ -StateFuture thenAccept(Consumer action); +StateFuture thenAccept(ThrowingConsumer action); /** * Returns a new future that, when this future completes normally, is executed with this future @@ -58,7 +58,8 @@ public interface StateFuture { * @param action the action to perform. * @return the new StateFuture. */ - StateFuture thenCompose(Function> action); + StateFuture thenCompose( +FunctionWithException, ? extends Exception> action); /** * Returns a new StateFuture that, when this and the other given future both complete normally, @@ -71,5 +72,6 @@ public interface StateFuture { * @return the new StateFuture. */ StateFuture thenCombine( -StateFuture other, BiFunction fn); +StateFuture other, +BiFunctionWithException fn); } diff --git a/flink-core-api/src/main/java/org/apache/flink/util/function/FunctionWithException.java b/flink-core-api/src/main/java/org/apache/flink/util/function/FunctionWithException.java index 76718656f06..86466c05d39 100644 --- a/flink-core-api/src/main/java/org/apache/flink/util/function/FunctionWithException.java +++ b/flink-core-api/src/main/java/org/apache/flink/util/function/FunctionWithException.java @@ -20,6 +20,8 @@ package org.apache.flink.util.function; import org.apache.flink.annotation.Public; +import java.util.function.Function; + /** * A functional interface for a {@link java.util.function.Function} that may throw exceptions. * @@ -39,4 +41,24 @@ public interface FunctionWithException { * @throws E This function may throw an exception. */ R apply(T value) throws E; + +/** +
(flink) branch master updated (714d1cb2e0b -> 3ff2ba43720)
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 714d1cb2e0b [FLINK-35189][test-utils] Introduce test-filesystem connector and catalog based on filesystem to support materialized table add 713c30f76d5 [FLINK-35026][runtime][config] Introduce async execution configurations add 3ff2ba43720 [FLINK-35026][runtime] Implement buffer timeout of AEC No new revisions were added by this update. Summary of changes: .../apache/flink/api/common/ExecutionConfig.java | 38 + .../flink/configuration/ExecutionOptions.java | 59 +++ .../asyncprocessing/AsyncExecutionController.java | 48 +++--- .../asyncprocessing/StateRequestBuffer.java| 83 +- .../AsyncExecutionControllerTest.java | 182 + .../AbstractAsyncStateStreamOperator.java | 23 ++- .../AbstractAsyncStateStreamOperatorV2.java| 18 +- .../StreamExecutionEnvironmentTest.java| 30 .../InternalTimerServiceAsyncImplTest.java | 2 +- 9 files changed, 418 insertions(+), 65 deletions(-)
(flink) branch master updated: [hotfix][runtime] Make sure run RecordContext#release() in Task thread. (#24705)
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 797cb9cd843 [hotfix][runtime] Make sure run RecordContext#release() in Task thread. (#24705) 797cb9cd843 is described below commit 797cb9cd843b1f7c102339019b3dfc45c2fa329d Author: Wang FeiFan AuthorDate: Thu Apr 25 17:59:27 2024 +0800 [hotfix][runtime] Make sure run RecordContext#release() in Task thread. (#24705) --- .../org/apache/flink/core/state/StateFutureImpl.java | 18 -- .../asyncprocessing/ContextStateFutureImpl.java| 10 +++--- .../flink/runtime/asyncprocessing/RecordContext.java | 17 ++--- .../runtime/asyncprocessing/ReferenceCounted.java | 11 --- .../runtime/asyncprocessing/ReferenceCountedTest.java | 4 ++-- 5 files changed, 43 insertions(+), 17 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java b/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java index d7db0be5311..ed6e5963e75 100644 --- a/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java +++ b/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java @@ -65,7 +65,7 @@ public class StateFutureImpl implements InternalStateFuture { (t) -> { callbackRunner.submit( () -> { -ret.complete(fn.apply(t)); + ret.completeInCallbackRunner(fn.apply(t)); callbackFinished(); }); }); @@ -91,7 +91,7 @@ public class StateFutureImpl implements InternalStateFuture { callbackRunner.submit( () -> { action.accept(t); -ret.complete(null); +ret.completeInCallbackRunner(null); callbackFinished(); }); }); @@ -116,7 +116,7 @@ public class StateFutureImpl implements InternalStateFuture { callbackRunner.submit( () -> { StateFuture su = action.apply(t); -su.thenAccept(ret::complete); + su.thenAccept(ret::completeInCallbackRunner); callbackFinished(); }); }); @@ -153,7 +153,8 @@ public class StateFutureImpl implements InternalStateFuture { (t) -> { callbackRunner.submit( () -> { - ret.complete(fn.apply(t, u)); + ret.completeInCallbackRunner( + fn.apply(t, u)); callbackFinished(); }); }); @@ -178,7 +179,12 @@ public class StateFutureImpl implements InternalStateFuture { @Override public void complete(T result) { completableFuture.complete(result); -postComplete(); +postComplete(false); +} + +private void completeInCallbackRunner(T result) { +completableFuture.complete(result); +postComplete(true); } /** Will be triggered when a callback is registered. */ @@ -187,7 +193,7 @@ public class StateFutureImpl implements InternalStateFuture { } /** Will be triggered when this future completes. */ -public void postComplete() { +public void postComplete(boolean inCallbackRunner) { // does nothing by default. } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImpl.java index 9355a43a795..cccd815af0e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImpl.java @@ -61,17 +61,21 @@ public class ContextStateFutureImpl extends StateFutureImpl { } @Override -public void postComple
(flink) branch master updated: [FLINK-35156][Runtime] Make operators of DataStream V2 integrate with async state processing
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new a87db9333d8 [FLINK-35156][Runtime] Make operators of DataStream V2 integrate with async state processing a87db9333d8 is described below commit a87db9333d80f720b6ba58e73da17d3d788c16c3 Author: Zakelly AuthorDate: Thu Apr 18 20:08:11 2024 +0800 [FLINK-35156][Runtime] Make operators of DataStream V2 integrate with async state processing --- .../datastream/impl/operators/ProcessOperator.java | 4 +- .../TwoInputBroadcastProcessOperator.java | 4 +- .../TwoInputNonBroadcastProcessOperator.java | 4 +- .../impl/operators/TwoOutputProcessOperator.java | 4 +- .../AbstractAsyncStateUdfStreamOperator.java | 168 + 5 files changed, 176 insertions(+), 8 deletions(-) diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java index 6346079cc21..fb773e271e1 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java @@ -23,15 +23,15 @@ import org.apache.flink.datastream.impl.common.OutputCollector; import org.apache.flink.datastream.impl.common.TimestampCollector; import org.apache.flink.datastream.impl.context.DefaultNonPartitionedContext; import org.apache.flink.datastream.impl.context.DefaultRuntimeContext; -import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.operators.asyncprocessing.AbstractAsyncStateUdfStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; /** Operator for {@link OneInputStreamProcessFunction}. */ public class ProcessOperator -extends AbstractUdfStreamOperator> +extends AbstractAsyncStateUdfStreamOperator> implements OneInputStreamOperator, BoundedOneInput { protected transient DefaultRuntimeContext context; diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java index 5c0faf05bab..0ba5b986fe5 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java @@ -23,17 +23,17 @@ import org.apache.flink.datastream.impl.common.OutputCollector; import org.apache.flink.datastream.impl.common.TimestampCollector; import org.apache.flink.datastream.impl.context.DefaultNonPartitionedContext; import org.apache.flink.datastream.impl.context.DefaultRuntimeContext; -import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; import org.apache.flink.streaming.api.operators.BoundedMultiInput; import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.runtime.operators.asyncprocessing.AbstractAsyncStateUdfStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import static org.apache.flink.util.Preconditions.checkState; /** Operator for {@link TwoInputBroadcastStreamProcessFunction}. */ public class TwoInputBroadcastProcessOperator -extends AbstractUdfStreamOperator< +extends AbstractAsyncStateUdfStreamOperator< OUT, TwoInputBroadcastStreamProcessFunction> implements TwoInputStreamOperator, BoundedMultiInput { diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java index 60d88682c2d..e4f02a10dcb 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java @@ -23,17 +23,17 @@ import org.apache.flink.datastream.impl.common.OutputCollector; import org.apache.flink.datastream.impl.common.TimestampCollector; import org.apache.flink.datastream.impl.context.DefaultNonPartitionedC
(flink) branch master updated (ea2eabe7da5 -> 8475d284f2a)
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from ea2eabe7da5 [FLINK-35214][runtime] Update result partition id for remote input channel when unknown input channel is updated add 8475d284f2a [FLINK-35158][Core/API] Move functional interfaces about consumers from core module to core-api No new revisions were added by this update. Summary of changes: .../util/function/BiConsumerWithException.java | 4 +-- .../util/function/BiFunctionWithException.java | 4 +-- .../flink/util/function/CheckedSupplier.java | 4 +-- .../flink/util/function/FunctionWithException.java | 0 .../util/function/LongFunctionWithException.java | 0 .../apache/flink/util/function/QuadConsumer.java | 0 .../apache/flink/util/function/QuadFunction.java | 0 .../flink/util/function/RunnableWithException.java | 0 .../flink/util/function/SerializableFunction.java | 0 .../flink/util/function/SerializableSupplier.java | 0 .../SerializableSupplierWithException.java | 0 .../flink/util/function/SupplierWithException.java | 0 .../flink/util/function/ThrowingConsumer.java | 0 .../util/function/ThrowingExceptionUtils.java | 32 -- .../flink/util/function/ThrowingRunnable.java | 3 +- .../apache/flink/util/function/TriConsumer.java| 0 .../util/function/TriConsumerWithException.java| 4 +-- .../apache/flink/util/function/TriFunction.java| 0 .../util/function/TriFunctionWithException.java| 3 +- pom.xml| 6 20 files changed, 29 insertions(+), 31 deletions(-) rename {flink-core => flink-core-api}/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java (95%) rename {flink-core => flink-core-api}/src/main/java/org/apache/flink/util/function/BiFunctionWithException.java (96%) rename {flink-core => flink-core-api}/src/main/java/org/apache/flink/util/function/CheckedSupplier.java (94%) rename {flink-core => flink-core-api}/src/main/java/org/apache/flink/util/function/FunctionWithException.java (100%) rename {flink-core => flink-core-api}/src/main/java/org/apache/flink/util/function/LongFunctionWithException.java (100%) rename {flink-core => flink-core-api}/src/main/java/org/apache/flink/util/function/QuadConsumer.java (100%) rename {flink-core => flink-core-api}/src/main/java/org/apache/flink/util/function/QuadFunction.java (100%) rename {flink-core => flink-core-api}/src/main/java/org/apache/flink/util/function/RunnableWithException.java (100%) rename {flink-core => flink-core-api}/src/main/java/org/apache/flink/util/function/SerializableFunction.java (100%) rename {flink-core => flink-core-api}/src/main/java/org/apache/flink/util/function/SerializableSupplier.java (100%) rename {flink-core => flink-core-api}/src/main/java/org/apache/flink/util/function/SerializableSupplierWithException.java (100%) copy {flink-core => flink-core-api}/src/main/java/org/apache/flink/util/function/SupplierWithException.java (100%) rename {flink-core => flink-core-api}/src/main/java/org/apache/flink/util/function/ThrowingConsumer.java (100%) rename flink-core/src/main/java/org/apache/flink/util/function/SupplierWithException.java => flink-core-api/src/main/java/org/apache/flink/util/function/ThrowingExceptionUtils.java (56%) rename {flink-core => flink-core-api}/src/main/java/org/apache/flink/util/function/ThrowingRunnable.java (95%) rename {flink-core => flink-core-api}/src/main/java/org/apache/flink/util/function/TriConsumer.java (100%) rename {flink-core => flink-core-api}/src/main/java/org/apache/flink/util/function/TriConsumerWithException.java (96%) rename {flink-core => flink-core-api}/src/main/java/org/apache/flink/util/function/TriFunction.java (100%) rename {flink-core => flink-core-api}/src/main/java/org/apache/flink/util/function/TriFunctionWithException.java (96%)
(flink) branch master updated (51d015b5704 -> 0b2e9880354)
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 51d015b5704 [FLINK-35097][table] Use var-args in RawFormatSerDeSchemaTest add 0b2e9880354 [FLINK-35028][runtime] Timer firing under async execution model No new revisions were added by this update. Summary of changes: .../asyncprocessing/AsyncExecutionController.java | 5 +- .../runtime/asyncprocessing/KeyAccountingUnit.java | 1 - .../runtime/asyncprocessing/RecordContext.java | 3 +- .../api/operators/AbstractStreamOperator.java | 2 +- .../api/operators/AbstractStreamOperatorV2.java| 3 +- .../api/operators/InternalTimeServiceManager.java | 16 ++ .../operators/InternalTimeServiceManagerImpl.java | 49 + .../operators/InternalTimerServiceAsyncImpl.java | 137 + .../api/operators/InternalTimerServiceImpl.java| 22 +- .../BatchExecutionInternalTimeServiceManager.java | 12 ++ .../AbstractAsyncStateStreamOperator.java | 54 + .../AbstractAsyncStateStreamOperatorV2.java| 31 +++ .../InternalTimerServiceAsyncImplTest.java | 221 + .../AbstractAsyncStateStreamOperatorTest.java | 26 +++ .../AbstractAsyncStateStreamOperatorV2Test.java| 24 +++ 15 files changed, 589 insertions(+), 17 deletions(-) create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImpl.java create mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImplTest.java
(flink) branch master updated: [FLINK-35027][runtime/checkpoint] Implement checkpoint drain in AsyncExecutionController
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 9336760ace7 [FLINK-35027][runtime/checkpoint] Implement checkpoint drain in AsyncExecutionController 9336760ace7 is described below commit 9336760ace73a0c84b949370eb773d71f5e3ac1f Author: fredia AuthorDate: Thu Apr 18 12:32:30 2024 +0800 [FLINK-35027][runtime/checkpoint] Implement checkpoint drain in AsyncExecutionController --- .../asyncprocessing/AsyncExecutionController.java | 45 ++- .../runtime/asyncprocessing/StateRequest.java | 2 +- .../api/operators/AbstractStreamOperator.java | 2 +- .../api/operators/AbstractStreamOperatorV2.java| 2 +- .../AbstractAsyncStateStreamOperator.java | 19 .../AbstractAsyncStateStreamOperatorV2.java| 19 .../AbstractAsyncStateStreamOperatorTest.java | 51 ++ .../AbstractAsyncStateStreamOperatorV2Test.java| 49 + 8 files changed, 174 insertions(+), 15 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java index 1964ec3f378..cfa0e4f0d0e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.asyncprocessing; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.api.common.state.v2.State; import org.apache.flink.core.state.InternalStateFuture; @@ -80,7 +81,7 @@ public class AsyncExecutionController { private final StateFutureFactory stateFutureFactory; /** The state executor where the {@link StateRequest} is actually executed. */ -final StateExecutor stateExecutor; +StateExecutor stateExecutor; /** The corresponding context that currently runs in task thread. */ RecordContext currentContext; @@ -235,17 +236,7 @@ public class AsyncExecutionController { // 2. If the state request is for a newly entered record, the in-flight record number should // be less than the max in-flight record number. // Note: the currentContext may be updated by {@code StateFutureFactory#build}. -try { -while (inFlightRecordNum.get() > maxInFlightRecordNum) { -if (!mailboxExecutor.tryYield()) { -triggerIfNeeded(true); -Thread.sleep(1); -} -} -} catch (InterruptedException ignored) { -// ignore the interrupted exception to avoid throwing fatal error when the task cancel -// or exit. -} +drainInflightRecords(maxInFlightRecordNum); // 3. Ensure the currentContext is restored. setCurrentContext(storedContext); inFlightRecordNum.incrementAndGet(); @@ -269,4 +260,34 @@ public class AsyncExecutionController { } }); } + +/** + * A helper function to drain in-flight records util {@link #inFlightRecordNum} within the limit + * of given {@code targetNum}. + * + * @param targetNum the target {@link #inFlightRecordNum} to achieve. + */ +public void drainInflightRecords(int targetNum) { +try { +while (inFlightRecordNum.get() > targetNum) { +if (!mailboxExecutor.tryYield()) { +triggerIfNeeded(true); +Thread.sleep(1); +} +} +} catch (InterruptedException ignored) { +// ignore the interrupted exception to avoid throwing fatal error when the task cancel +// or exit. +} +} + +@VisibleForTesting +public void setStateExecutor(StateExecutor stateExecutor) { +this.stateExecutor = stateExecutor; +} + +@VisibleForTesting +public int getInFlightRecordNum() { +return inFlightRecordNum.get(); +} } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java index aa050cf6557..59009361751 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java @@ -78,7 +78,7 @@ public class StateRequest implements Serializable { return state; } -InternalStateFuture getFuture() { +
(flink) 02/03: [FLINK-35025][Runtime/State] Abstract stream operators for async state processing
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit e1edb24ecc2e28987803ded3b460323c0179a052 Author: Zakelly AuthorDate: Fri Apr 12 21:52:29 2024 +0800 [FLINK-35025][Runtime/State] Abstract stream operators for async state processing --- .../streaming/api/operators/AbstractInput.java | 23 +++- .../api/operators/AbstractStreamOperator.java | 4 +- .../api/operators/AbstractStreamOperatorV2.java| 2 +- .../streaming/runtime/io/RecordProcessorUtils.java | 10 ++ .../AbstractAsyncStateStreamOperator.java | 133 .../AbstractAsyncStateStreamOperatorV2.java| 108 .../asyncprocessing/AsyncStateProcessing.java | 65 ++ .../AsyncStateProcessingOperator.java | 44 +++ .../AbstractAsyncStateStreamOperatorTest.java | 98 +++ .../AbstractAsyncStateStreamOperatorV2Test.java| 140 + .../multipleinput/input/FirstInputOfTwoInput.java | 18 ++- .../operators/multipleinput/input/OneInput.java| 18 ++- .../multipleinput/input/SecondInputOfTwoInput.java | 18 ++- 13 files changed, 674 insertions(+), 7 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java index 9982d12c8e8..20c3006d838 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java @@ -19,12 +19,16 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.annotation.Experimental; +import org.apache.flink.annotation.Internal; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessing; +import org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessingOperator; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; +import org.apache.flink.util.function.ThrowingConsumer; import javax.annotation.Nullable; @@ -35,7 +39,8 @@ import static org.apache.flink.util.Preconditions.checkArgument; * AbstractStreamOperatorV2}. */ @Experimental -public abstract class AbstractInput implements Input, KeyContextHandler { +public abstract class AbstractInput +implements Input, KeyContextHandler, AsyncStateProcessing { /** * {@code KeySelector} for extracting a key from an element being processed. This is used to * scope keyed state to a key. This is null if the operator is not a keyed operator. @@ -86,4 +91,20 @@ public abstract class AbstractInput implements Input, KeyContextHan public boolean hasKeyContext() { return stateKeySelector != null; } + +@Internal +@Override +public final boolean isAsyncStateProcessingEnabled() { +return (owner instanceof AsyncStateProcessingOperator) +&& ((AsyncStateProcessingOperator) owner).isAsyncStateProcessingEnabled(); +} + +@Internal +@Override +public final ThrowingConsumer, Exception> getRecordProcessor(int inputId) { +return AsyncStateProcessing.makeRecordProcessor( +(AsyncStateProcessingOperator) owner, +(KeySelector) stateKeySelector, +this::processElement); +} } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 632b89b16ae..76fef0c26cf 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -128,7 +128,7 @@ public abstract class AbstractStreamOperator * * This is for elements from the first input. */ -private transient KeySelector stateKeySelector1; +protected transient KeySelector stateKeySelector1; /** * {@code KeySelector} for extracting a key from an element being processed. This is used to @@ -136,7 +136,7 @@ public abstract class AbstractStreamOperator * * This is for elements from the second input. */ -private transient KeySelector stateKeySelector2; +protected transient KeySelector stateKeySelector2; privat
(flink) branch master updated: [FLINK-34936][Checkpointing] Register reused shared state handle to FileMergingSnapshotManager
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 31ea1a9358f [FLINK-34936][Checkpointing] Register reused shared state handle to FileMergingSnapshotManager 31ea1a9358f is described below commit 31ea1a9358f4aaa1083c6fbb7bdee070be29f4e6 Author: Zakelly AuthorDate: Tue Apr 9 20:16:31 2024 +0800 [FLINK-34936][Checkpointing] Register reused shared state handle to FileMergingSnapshotManager --- .../filemerging/FileMergingSnapshotManager.java| 12 + .../FileMergingSnapshotManagerBase.java| 38 +++-- .../metadata/MetadataV2V3SerializerBase.java | 6 ++- .../runtime/state/CheckpointStreamFactory.java | 12 + .../filemerging/EmptySegmentFileStateHandle.java | 15 -- .../state/filemerging/SegmentFileStateHandle.java | 21 ++-- .../FsMergingCheckpointStorageLocation.java| 6 +++ .../FileMergingSnapshotManagerTestBase.java| 62 ++ .../checkpoint/metadata/CheckpointTestUtils.java | 8 ++- ...FileMergingCheckpointStateOutputStreamTest.java | 8 ++- .../snapshot/RocksIncrementalSnapshotStrategy.java | 15 +- 11 files changed, 189 insertions(+), 14 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java index afc5eb618dc..2aa32ba65c0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java @@ -24,12 +24,14 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.CheckpointedStateScope; +import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.TaskStateManager; import org.apache.flink.runtime.state.filemerging.DirectoryStreamStateHandle; import org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream; import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageAccess; import java.io.Closeable; +import java.util.Collection; /** * FileMergingSnapshotManager provides an interface to manage files and meta information for @@ -157,6 +159,16 @@ public interface FileMergingSnapshotManager extends Closeable { */ void notifyCheckpointSubsumed(SubtaskKey subtaskKey, long checkpointId) throws Exception; +/** + * A callback method which is called when previous state handles are reused by following + * checkpoint(s). + * + * @param checkpointId the checkpoint that reuses the handles. + * @param stateHandles the handles to be reused. + */ +void reusePreviousStateHandle( +long checkpointId, Collection stateHandles); + /** * A key identifies a subtask. A subtask can be identified by the operator id, subtask index and * the parallelism. Note that this key should be consistent across job attempts. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java index d46edcd235e..db90d654d61 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java @@ -27,6 +27,7 @@ import org.apache.flink.core.fs.OutputStreamAndPath; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.checkpoint.filemerging.LogicalFile.LogicalFileId; import org.apache.flink.runtime.state.CheckpointedStateScope; +import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.filemerging.DirectoryStreamStateHandle; import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle; import org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream; @@ -42,6 +43,7 @@ import javax.annotation.concurrent.GuardedBy; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.util.Collection; import java.util.HashSet; import java.util.Iterator; import java.util.Map; @@ -70,6 +72,9 @@ public abstract class FileMergingSnapshotManagerBase implements FileMergingSnaps @GuardedBy("lock") protected TreeMap> uploadedStates = new TreeMap<>(); +/** The map that holds all the known live logical files. */ +
(flink) 03/03: [FLINK-35025][Runtime/State] Introduce element order for async state processing
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit fe8dde4ea489f466982ba896c1e2b22a7466e719 Author: Zakelly AuthorDate: Mon Apr 15 20:53:47 2024 +0800 [FLINK-35025][Runtime/State] Introduce element order for async state processing --- .../asyncprocessing/AsyncExecutionController.java | 21 .../asyncprocessing/StateRequestBuffer.java| 8 +- .../AsyncExecutionControllerTest.java | 44 +++ .../AbstractAsyncStateStreamOperator.java | 42 --- .../AbstractAsyncStateStreamOperatorV2.java| 17 +++ .../asyncprocessing/AsyncStateProcessing.java | 23 +++- .../AsyncStateProcessingOperator.java | 12 ++ ...teProcessingOperator.java => ElementOrder.java} | 28 +++-- .../AbstractAsyncStateStreamOperatorTest.java | 108 - .../AbstractAsyncStateStreamOperatorV2Test.java| 130 +++-- 10 files changed, 380 insertions(+), 53 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java index 8dd8d16e721..1964ec3f378 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java @@ -21,7 +21,9 @@ package org.apache.flink.runtime.asyncprocessing; import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.api.common.state.v2.State; import org.apache.flink.core.state.InternalStateFuture; +import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.ThrowingRunnable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -248,4 +250,23 @@ public class AsyncExecutionController { setCurrentContext(storedContext); inFlightRecordNum.incrementAndGet(); } + +/** + * A helper to request a {@link StateRequestType#SYNC_POINT} and run a callback if it finishes + * (once the record is not blocked). + * + * @param callback the callback to run if it finishes (once the record is not blocked). + */ +public void syncPointRequestWithCallback(ThrowingRunnable callback) { +handleRequest(null, StateRequestType.SYNC_POINT, null) +.thenAccept( +v -> { +try { +callback.run(); +} catch (Exception e) { +// TODO: Properly handle the exception and fail the entire job. +throw new FlinkRuntimeException("Unexpected runtime exception", e); +} +}); +} } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java index 0eea7bbcacc..9fa10bfee5b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java @@ -60,7 +60,11 @@ public class StateRequestBuffer { } void enqueueToActive(StateRequest request) { -activeQueue.add(request); +if (request.getRequestType() == StateRequestType.SYNC_POINT) { +request.getFuture().complete(null); +} else { +activeQueue.add(request); +} } void enqueueToBlocking(StateRequest request) { @@ -83,7 +87,7 @@ public class StateRequestBuffer { } StateRequest stateRequest = blockingQueue.get(key).removeFirst(); -activeQueue.add(stateRequest); +enqueueToActive(stateRequest); if (blockingQueue.get(key).isEmpty()) { blockingQueue.remove(key); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java index 5d661958fc4..d36a5e53ae6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java @@ -300,6 +300,50 @@ class AsyncExecutionControllerTest { } } +@Test +public void testSyncPoint() { +AtomicInteger counter = new AtomicInteger(0); + +// Test the sync point processing without a key occupied. +RecordContext recordContext = aec.buil
(flink) branch master updated (31ea1a9358f -> fe8dde4ea48)
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 31ea1a9358f [FLINK-34936][Checkpointing] Register reused shared state handle to FileMergingSnapshotManager new c7be45d0ba9 [FLINK-35025][Runtime/State] Erase the type parameter of record within AEC and related components. new e1edb24ecc2 [FLINK-35025][Runtime/State] Abstract stream operators for async state processing new fe8dde4ea48 [FLINK-35025][Runtime/State] Introduce element order for async state processing The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../asyncprocessing/AsyncExecutionController.java | 44 +++- .../asyncprocessing/ContextStateFutureImpl.java| 4 +- .../runtime/asyncprocessing/KeyAccountingUnit.java | 9 +- .../runtime/asyncprocessing/RecordContext.java | 14 +- .../asyncprocessing/StateFutureFactory.java| 9 +- .../runtime/asyncprocessing/StateRequest.java | 6 +- .../asyncprocessing/StateRequestBuffer.java| 29 ++- .../AsyncExecutionControllerTest.java | 68 +- .../ContextStateFutureImplTest.java| 10 +- .../asyncprocessing/KeyAccountingUnitTest.java | 2 +- .../streaming/api/operators/AbstractInput.java | 23 +- .../api/operators/AbstractStreamOperator.java | 4 +- .../api/operators/AbstractStreamOperatorV2.java| 2 +- .../streaming/runtime/io/RecordProcessorUtils.java | 10 + .../AbstractAsyncStateStreamOperator.java | 145 .../AbstractAsyncStateStreamOperatorV2.java| 125 +++ .../asyncprocessing/AsyncStateProcessing.java | 78 +++ .../AsyncStateProcessingOperator.java | 56 + .../operators/asyncprocessing/ElementOrder.java| 42 .../AbstractAsyncStateStreamOperatorTest.java | 196 .../AbstractAsyncStateStreamOperatorV2Test.java| 248 + .../multipleinput/input/FirstInputOfTwoInput.java | 18 +- .../operators/multipleinput/input/OneInput.java| 18 +- .../multipleinput/input/SecondInputOfTwoInput.java | 18 +- 24 files changed, 1108 insertions(+), 70 deletions(-) create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AsyncStateProcessing.java create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AsyncStateProcessingOperator.java create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/ElementOrder.java create mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorTest.java create mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2Test.java
(flink) 01/03: [FLINK-35025][Runtime/State] Erase the type parameter of record within AEC and related components.
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit c7be45d0ba9c4ab6c2ce7757b6c8cd8e1f0fe539 Author: Zakelly AuthorDate: Fri Apr 12 18:34:07 2024 +0800 [FLINK-35025][Runtime/State] Erase the type parameter of record within AEC and related components. --- .../asyncprocessing/AsyncExecutionController.java | 23 ++--- .../asyncprocessing/ContextStateFutureImpl.java| 4 ++-- .../runtime/asyncprocessing/KeyAccountingUnit.java | 9 .../runtime/asyncprocessing/RecordContext.java | 14 ++--- .../asyncprocessing/StateFutureFactory.java| 9 .../runtime/asyncprocessing/StateRequest.java | 6 +++--- .../asyncprocessing/StateRequestBuffer.java| 21 +++ .../AsyncExecutionControllerTest.java | 24 +++--- .../ContextStateFutureImplTest.java| 10 - .../asyncprocessing/KeyAccountingUnitTest.java | 2 +- 10 files changed, 61 insertions(+), 61 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java index cf8304a71ea..8dd8d16e721 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java @@ -42,10 +42,9 @@ import java.util.concurrent.atomic.AtomicInteger; * (back-pressure). It invokes {@link MailboxExecutor#yield()} to pause current operations, * allowing for the execution of callbacks (mails in Mailbox). * - * @param the type of the record * @param the type of the key */ -public class AsyncExecutionController { +public class AsyncExecutionController { private static final Logger LOG = LoggerFactory.getLogger(AsyncExecutionController.class); @@ -70,22 +69,22 @@ public class AsyncExecutionController { private final MailboxExecutor mailboxExecutor; /** The key accounting unit which is used to detect the key conflict. */ -final KeyAccountingUnit keyAccountingUnit; +final KeyAccountingUnit keyAccountingUnit; /** * A factory to build {@link org.apache.flink.core.state.InternalStateFuture}, this will auto * wire the created future with mailbox executor. Also conducting the context switch. */ -private final StateFutureFactory stateFutureFactory; +private final StateFutureFactory stateFutureFactory; /** The state executor where the {@link StateRequest} is actually executed. */ final StateExecutor stateExecutor; /** The corresponding context that currently runs in task thread. */ -RecordContext currentContext; +RecordContext currentContext; /** The buffer to store the state requests to execute in batch. */ -StateRequestBuffer stateRequestsBuffer; +StateRequestBuffer stateRequestsBuffer; /** * The number of in-flight records. Including the records in active buffer and blocking buffer. @@ -123,7 +122,7 @@ public class AsyncExecutionController { * @param key the given key. * @return the built record context. */ -public RecordContext buildContext(R record, K key) { +public RecordContext buildContext(Object record, K key) { return new RecordContext<>(record, key, this::disposeContext); } @@ -133,7 +132,7 @@ public class AsyncExecutionController { * * @param switchingContext the context to switch. */ -public void setCurrentContext(RecordContext switchingContext) { +public void setCurrentContext(RecordContext switchingContext) { currentContext = switchingContext; } @@ -142,10 +141,10 @@ public class AsyncExecutionController { * * @param toDispose the context to dispose. */ -public void disposeContext(RecordContext toDispose) { +public void disposeContext(RecordContext toDispose) { keyAccountingUnit.release(toDispose.getRecord(), toDispose.getKey()); inFlightRecordNum.decrementAndGet(); -RecordContext nextRecordCtx = +RecordContext nextRecordCtx = stateRequestsBuffer.tryActivateOneByKey(toDispose.getKey()); if (nextRecordCtx != null) { Preconditions.checkState( @@ -160,7 +159,7 @@ public class AsyncExecutionController { * @param recordContext the given context. * @return true if occupy succeed or the key has already occupied by this context. */ -boolean tryOccupyKey(RecordContext recordContext) { +boolean tryOccupyKey(RecordContext recordContext) { boolean occupied = recordContext.isKeyOccupied(); if (!occupied && keyAccount
(flink) branch master updated: [FLINK-32440][checkpoint] Introduce file merging configuration (#22973)
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new c601c70e5c4 [FLINK-32440][checkpoint] Introduce file merging configuration (#22973) c601c70e5c4 is described below commit c601c70e5c4a9316bee049c613c011b14fab7f5e Author: Yanfei Lei AuthorDate: Mon Apr 15 14:21:40 2024 +0800 [FLINK-32440][checkpoint] Introduce file merging configuration (#22973) --- .../generated/checkpoint_file_merging_section.html | 36 +++ .../generated/checkpointing_configuration.html | 24 + .../flink/annotation/docs/Documentation.java | 2 + .../flink/configuration/CheckpointingOptions.java | 107 + .../FileMergingSnapshotManagerBuilder.java | 4 +- .../runtime/state/CheckpointStorageWorkerView.java | 2 +- .../state/TaskExecutorFileMergingManager.java | 45 - .../flink/runtime/taskexecutor/TaskExecutor.java | 5 +- .../state/TaskExecutorFileMergingManagerTest.java | 14 ++- .../flink/streaming/runtime/tasks/StreamTask.java | 25 - 10 files changed, 248 insertions(+), 16 deletions(-) diff --git a/docs/layouts/shortcodes/generated/checkpoint_file_merging_section.html b/docs/layouts/shortcodes/generated/checkpoint_file_merging_section.html new file mode 100644 index 000..50ee139e0ee --- /dev/null +++ b/docs/layouts/shortcodes/generated/checkpoint_file_merging_section.html @@ -0,0 +1,36 @@ + + + +Key +Default +Type +Description + + + + +state.checkpoints.file-merging.enabled +false +Boolean +Whether to enable merging multiple checkpoint files into one, which will greatly reduce the number of small checkpoint files. This is an experimental feature under evaluation, make sure you're aware of the possible effects of enabling it. + + + state.checkpoints.file-merging.across-checkpoint-boundary +false +Boolean +Only relevant if state.checkpoints.file-merging.enabled is enabled.Whether to allow merging data of multiple checkpoints into one physical file. If this option is set to false, only merge files within checkpoint boundaries. Otherwise, it is possible for the logical files of different checkpoints to share the same physical file. + + +state.checkpoints.file-merging.max-file-size +32 mb +MemorySize +Max size of a physical file for merged checkpoints. + + +state.checkpoints.file-merging.pool-blocking +false +Boolean +Whether to use Blocking or Non-Blocking pool for merging physical files. A Non-Blocking pool will always provide usable physical file without blocking. It may create many physical files if poll file frequently. When poll a small file from a Blocking pool, it may be blocked until the file is returned. + + + diff --git a/docs/layouts/shortcodes/generated/checkpointing_configuration.html b/docs/layouts/shortcodes/generated/checkpointing_configuration.html index c87a9c33803..3b9fda388e4 100644 --- a/docs/layouts/shortcodes/generated/checkpointing_configuration.html +++ b/docs/layouts/shortcodes/generated/checkpointing_configuration.html @@ -44,6 +44,30 @@ String The default directory used for storing the data files and meta data of checkpoints in a Flink supported filesystem. The storage path must be accessible from all participating processes/nodes(i.e. all TaskManagers and JobManagers). If the 'state.checkpoint-storage' is set to 'jobmanager', only the meta data of checkpoints will be stored in this directory. + + state.checkpoints.file-merging.across-checkpoint-boundary +false +Boolean +Only relevant if state.checkpoints.file-merging.enabled is enabled.Whether to allow merging data of multiple checkpoints into one physical file. If this option is set to false, only merge files within checkpoint boundaries. Otherwise, it is possible for the logical files of different checkpoints to share the same physical file. + + +state.checkpoints.file-merging.enabled +false +Boolean +Whether to enable merging multiple checkpoint files into one, which will greatly reduce the number of small checkpoint files. This is an experimental feature under evaluation, make sure you're aware of the possible effects of enabling it. + + +state.checkpoints.file-merging.max-file-size +32 mb +MemorySize +Max size of a physic
(flink) branch master updated: [FLINK-35024][Runtime/State] Implement the record buffer of AsyncExecutionController (#24633)
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 6d139a19c80 [FLINK-35024][Runtime/State] Implement the record buffer of AsyncExecutionController (#24633) 6d139a19c80 is described below commit 6d139a19c809f787317c5afa4e56e1c544125e5f Author: Yanfei Lei AuthorDate: Mon Apr 15 13:56:53 2024 +0800 [FLINK-35024][Runtime/State] Implement the record buffer of AsyncExecutionController (#24633) --- .../asyncprocessing/AsyncExecutionController.java | 98 +++- .../asyncprocessing/StateRequestBuffer.java| 125 ++ .../AsyncExecutionControllerTest.java | 259 ++--- 3 files changed, 385 insertions(+), 97 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java index ffecc5f9687..cf8304a71ea 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java @@ -21,12 +21,16 @@ package org.apache.flink.runtime.asyncprocessing; import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.api.common.state.v2.State; import org.apache.flink.core.state.InternalStateFuture; +import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + /** * The Async Execution Controller (AEC) receives processing requests from operators, and put them * into execution according to some strategies. @@ -45,11 +49,26 @@ public class AsyncExecutionController { private static final Logger LOG = LoggerFactory.getLogger(AsyncExecutionController.class); +public static final int DEFAULT_BATCH_SIZE = 1000; public static final int DEFAULT_MAX_IN_FLIGHT_RECORD_NUM = 6000; +/** + * The batch size. When the number of state requests in the active buffer exceeds the batch + * size, a batched state execution would be triggered. + */ +private final int batchSize; + /** The max allowed number of in-flight records. */ private final int maxInFlightRecordNum; +/** + * The mailbox executor borrowed from {@code StreamTask}. Keeping the reference of + * mailboxExecutor here is to restrict the number of in-flight records, when the number of + * in-flight records > {@link #maxInFlightRecordNum}, the newly entering records would be + * blocked. + */ +private final MailboxExecutor mailboxExecutor; + /** The key accounting unit which is used to detect the key conflict. */ final KeyAccountingUnit keyAccountingUnit; @@ -65,17 +84,35 @@ public class AsyncExecutionController { /** The corresponding context that currently runs in task thread. */ RecordContext currentContext; +/** The buffer to store the state requests to execute in batch. */ +StateRequestBuffer stateRequestsBuffer; + +/** + * The number of in-flight records. Including the records in active buffer and blocking buffer. + */ +final AtomicInteger inFlightRecordNum; + public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) { -this(mailboxExecutor, stateExecutor, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM); +this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM); } public AsyncExecutionController( -MailboxExecutor mailboxExecutor, StateExecutor stateExecutor, int maxInFlightRecords) { +MailboxExecutor mailboxExecutor, +StateExecutor stateExecutor, +int batchSize, +int maxInFlightRecords) { this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords); +this.mailboxExecutor = mailboxExecutor; this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor); this.stateExecutor = stateExecutor; +this.batchSize = batchSize; this.maxInFlightRecordNum = maxInFlightRecords; -LOG.info("Create AsyncExecutionController: maxInFlightRecordsNum {}", maxInFlightRecords); +this.stateRequestsBuffer = new StateRequestBuffer<>(); +this.inFlightRecordNum = new AtomicInteger(0); +LOG.info( +"Create AsyncExecutionController: batchSize {}, maxInFlightRecordsNum {}", +batchSize, +maxInFlightRecords); } /** @@ -107,6 +144,14 @@ public class AsyncExecutionController { */
(flink) 02/03: [FLINK-34986][Runtime/State] Basic implementation of AEC, RecordContext and reference counting
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 294084872604d599c1e2c52302ce0f08bb0e6492 Author: Zakelly AuthorDate: Wed Apr 3 18:39:17 2024 +0800 [FLINK-34986][Runtime/State] Basic implementation of AEC, RecordContext and reference counting --- .../flink/core/state/CompletedStateFuture.java | 5 + .../flink/core/state/InternalStateFuture.java | 3 + .../apache/flink/core/state/StateFutureImpl.java | 48 +++- .../asyncprocessing/AsyncExecutionController.java | 170 .../asyncprocessing/ContextStateFutureImpl.java| 75 + .../KeyAccountingUnit.java | 51 ++-- .../RecordContext.java | 57 +++- .../ReferenceCounted.java | 19 +- .../runtime/asyncprocessing/StateExecutor.java | 21 +- .../asyncprocessing/StateFutureFactory.java| 51 .../runtime/asyncprocessing/StateRequest.java | 88 ++ .../runtime/asyncprocessing/StateRequestType.java | 103 +++ .../taskprocessing/AsyncExecutionController.java | 80 -- .../runtime/taskprocessing/OrderPreserveMode.java | 38 --- .../runtime/taskprocessing/ProcessingRequest.java | 69 - .../runtime/taskprocessing/StateExecutor.java | 19 -- .../AsyncExecutionControllerTest.java | 301 + .../KeyAccountingUnitTest.java}| 21 +- .../ReferenceCountedTest.java | 11 +- .../AsyncExecutionControllerTest.java | 280 --- 20 files changed, 938 insertions(+), 572 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/core/state/CompletedStateFuture.java b/flink-core/src/main/java/org/apache/flink/core/state/CompletedStateFuture.java index 615b8e78c32..13b7de90cee 100644 --- a/flink-core/src/main/java/org/apache/flink/core/state/CompletedStateFuture.java +++ b/flink-core/src/main/java/org/apache/flink/core/state/CompletedStateFuture.java @@ -67,6 +67,11 @@ public class CompletedStateFuture implements InternalStateFuture { }); } +@Override +public void complete(T result) { +throw new UnsupportedOperationException("This state future has already been completed."); +} + @Override public void thenSyncAccept(Consumer action) { action.accept(result); diff --git a/flink-core/src/main/java/org/apache/flink/core/state/InternalStateFuture.java b/flink-core/src/main/java/org/apache/flink/core/state/InternalStateFuture.java index 7fd6d7485ff..bbbd4a94f71 100644 --- a/flink-core/src/main/java/org/apache/flink/core/state/InternalStateFuture.java +++ b/flink-core/src/main/java/org/apache/flink/core/state/InternalStateFuture.java @@ -29,6 +29,9 @@ import java.util.function.Consumer; @Internal public interface InternalStateFuture extends StateFuture { +/** Complete this future. */ +void complete(T result); + /** * Accept the action in the same thread with the one of complete (or current thread if it has * been completed). diff --git a/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java b/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java index dfae92c845f..63de46cc91e 100644 --- a/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java +++ b/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java @@ -40,11 +40,11 @@ import java.util.function.Function; @Internal public class StateFutureImpl implements InternalStateFuture { -/** The future holds the result. The completes in async threads. */ -CompletableFuture completableFuture; +/** The future holds the result. This may complete in async threads. */ +private final CompletableFuture completableFuture; /** The callback runner. */ -CallbackRunner callbackRunner; +protected final CallbackRunner callbackRunner; public StateFutureImpl(CallbackRunner callbackRunner) { this.completableFuture = new CompletableFuture<>(); @@ -53,17 +53,20 @@ public class StateFutureImpl implements InternalStateFuture { @Override public StateFuture thenApply(Function fn) { +callbackRegistered(); try { if (completableFuture.isDone()) { U r = fn.apply(completableFuture.get()); +callbackFinished(); return StateFutureUtils.completedFuture(r); } else { -StateFutureImpl ret = new StateFutureImpl<>(callbackRunner); +StateFutureImpl ret = makeNewStateFuture(); completableFuture.thenAccept( (t) -> { callbackRunner.submit( () -> {
(flink) 01/03: [FLINK-34986][Runtime/State] Introduce element-control component
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 60c771b987af73382f8a41ccf7fecfdbe6dc8b1c Author: fredia AuthorDate: Thu Mar 21 17:51:23 2024 +0800 [FLINK-34986][Runtime/State] Introduce element-control component --- .../apache/flink/core/state/StateFutureImpl.java | 2 +- .../taskprocessing/AsyncExecutionController.java | 80 ++ .../runtime/taskprocessing/KeyAccountingUnit.java | 82 ++ .../runtime/taskprocessing/OrderPreserveMode.java | 38 +++ .../runtime/taskprocessing/ProcessingRequest.java | 69 + .../runtime/taskprocessing/RecordContext.java | 82 ++ .../runtime/taskprocessing/ReferenceCounted.java | 98 .../runtime/taskprocessing/StateExecutor.java | 19 ++ .../AsyncExecutionControllerTest.java | 280 + .../taskprocessing/ReferenceCountedTest.java | 71 ++ .../taskprocessing/keyAccountingUnitTest.java | 43 11 files changed, 863 insertions(+), 1 deletion(-) diff --git a/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java b/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java index 3cdc873e789..dfae92c845f 100644 --- a/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java +++ b/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java @@ -46,7 +46,7 @@ public class StateFutureImpl implements InternalStateFuture { /** The callback runner. */ CallbackRunner callbackRunner; -StateFutureImpl(CallbackRunner callbackRunner) { +public StateFutureImpl(CallbackRunner callbackRunner) { this.completableFuture = new CompletableFuture<>(); this.callbackRunner = callbackRunner; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/AsyncExecutionController.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/AsyncExecutionController.java new file mode 100644 index 000..133e6dcd27a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/AsyncExecutionController.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskprocessing; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.operators.MailboxExecutor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; + +/** + * The Async Execution Controller (AEC) receives processing requests from operators, and put them + * into execution according to some strategies. + * + * It is responsible for: + * Preserving the sequence of elements bearing the same key by delaying subsequent requests + * until the processing of preceding ones is finalized. + * Tracking the in-flight data(records) and blocking the input if too much data in flight + * (back-pressure). It invokes {@link MailboxExecutor#yield()} to pause current operations, + * allowing for the execution of callbacks (mails in Mailbox). + * + * @param the type of the record + * @param the type of the key + */ +@Internal +public class AsyncExecutionController { + +private static final Logger LOG = LoggerFactory.getLogger(AsyncExecutionController.class); + +public static final int DEFAULT_MAX_IN_FLIGHT_RECORD_NUM = 6000; + +/** The max allow number of in-flight records. */ +private final int maxInFlightRecordNum; + +/** The key accounting unit which is used to detect the key conflict. */ +private final KeyAccountingUnit keyAccountingUnit; + +/** The mailbox executor, borrowed from {@code StreamTask}. */ +private final MailboxExecutor mailboxExecutor; + +/** The state executor where the {@link ProcessingRequest} is actually executed. */ +private final StateExecutor stateExecutor; + +public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) { +this(mailboxExecutor, stateExecutor, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM); +} + +publi
(flink) 03/03: [FLINK-34986][Runtime/State] More tests for reference counting of ContextStateFutureImpl
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit ec2a140daed15ef596a70f9fb645c55a23c84a2c Author: Zakelly AuthorDate: Sun Apr 7 11:47:14 2024 +0800 [FLINK-34986][Runtime/State] More tests for reference counting of ContextStateFutureImpl --- .../apache/flink/core/state/StateFutureImpl.java | 3 +- .../asyncprocessing/ContextStateFutureImpl.java| 2 + .../AsyncExecutionControllerTest.java | 8 +- .../ContextStateFutureImplTest.java| 241 + 4 files changed, 249 insertions(+), 5 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java b/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java index 63de46cc91e..d7db0be5311 100644 --- a/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java +++ b/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java @@ -166,7 +166,8 @@ public class StateFutureImpl implements InternalStateFuture { } /** - * Make a new future based on context of this future. + * Make a new future based on context of this future. Subclasses need to overload this method to + * generate their own instances (if needed). * * @return the new created future. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImpl.java index f1f182b25fe..31e89676523 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImpl.java @@ -32,6 +32,8 @@ import org.apache.flink.core.state.StateFutureImpl; * 2. -1 when future completed. * 3. +1 when callback registered. * 4. -1 when callback finished. + * Please refer to {@code ContextStateFutureImplTest} where the reference counting is carefully + * tested. */ public class ContextStateFutureImpl extends StateFutureImpl { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java index b3bae8ae6a0..41c2031414d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java @@ -170,7 +170,7 @@ class AsyncExecutionControllerTest { * An AsyncExecutionController for testing purpose, which integrates with basic buffer * mechanism. */ -class TestAsyncExecutionController extends AsyncExecutionController { +static class TestAsyncExecutionController extends AsyncExecutionController { LinkedList> activeBuffer; @@ -216,7 +216,7 @@ class AsyncExecutionControllerTest { } /** Simulate the underlying state that is actually used to execute the request. */ -class TestUnderlyingState { +static class TestUnderlyingState { private final HashMap hashMap; @@ -233,7 +233,7 @@ class AsyncExecutionControllerTest { } } -class TestValueState implements ValueState { +static class TestValueState implements ValueState { private final AsyncExecutionController asyncExecutionController; @@ -266,7 +266,7 @@ class AsyncExecutionControllerTest { * A brief implementation of {@link StateExecutor}, to illustrate the interaction between AEC * and StateExecutor. */ -class TestStateExecutor implements StateExecutor { +static class TestStateExecutor implements StateExecutor { public TestStateExecutor() {} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImplTest.java new file mode 100644 index 000..d0a169f6444 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImplTest.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distribu
(flink) branch master updated (54f789d21ef -> ec2a140daed)
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 54f789d21ef [hotfix][doc] Generate and use docs of RpcOptions instead of AkkaOptions (#24629) new 60c771b987a [FLINK-34986][Runtime/State] Introduce element-control component new 29408487260 [FLINK-34986][Runtime/State] Basic implementation of AEC, RecordContext and reference counting new ec2a140daed [FLINK-34986][Runtime/State] More tests for reference counting of ContextStateFutureImpl The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../flink/core/state/CompletedStateFuture.java | 5 + .../flink/core/state/InternalStateFuture.java | 3 + .../apache/flink/core/state/StateFutureImpl.java | 51 +++- .../asyncprocessing/AsyncExecutionController.java | 170 .../asyncprocessing/ContextStateFutureImpl.java| 77 ++ .../runtime/asyncprocessing/KeyAccountingUnit.java | 67 + .../runtime/asyncprocessing/RecordContext.java | 119 .../runtime/asyncprocessing/ReferenceCounted.java | 103 +++ .../runtime/asyncprocessing/StateExecutor.java | 21 +- .../asyncprocessing/StateFutureFactory.java| 51 .../runtime/asyncprocessing/StateRequest.java | 88 ++ .../runtime/asyncprocessing/StateRequestType.java | 103 +++ .../AsyncExecutionControllerTest.java | 301 + .../ContextStateFutureImplTest.java| 241 + .../asyncprocessing/KeyAccountingUnitTest.java | 40 +++ .../asyncprocessing/ReferenceCountedTest.java | 72 + 16 files changed, 1492 insertions(+), 20 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImpl.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/KeyAccountingUnit.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/RecordContext.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ReferenceCounted.java copy flink-core/src/main/java/org/apache/flink/core/state/InternalStateFuture.java => flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateExecutor.java (63%) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateFutureFactory.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestType.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImplTest.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/KeyAccountingUnitTest.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/ReferenceCountedTest.java
(flink) branch master updated: [hotfix][doc] Generate and use docs of RpcOptions instead of AkkaOptions (#24629)
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 54f789d21ef [hotfix][doc] Generate and use docs of RpcOptions instead of AkkaOptions (#24629) 54f789d21ef is described below commit 54f789d21ef8616f5ea2aa495ffecb56c0430bc9 Author: Zakelly AuthorDate: Thu Apr 11 10:07:12 2024 +0800 [hotfix][doc] Generate and use docs of RpcOptions instead of AkkaOptions (#24629) --- docs/content.zh/docs/deployment/config.md | 2 +- docs/content/docs/deployment/config.md | 2 +- .../generated/{akka_configuration.html => rpc_configuration.html} | 0 3 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/content.zh/docs/deployment/config.md b/docs/content.zh/docs/deployment/config.md index 56d9206ab06..711d97aef40 100644 --- a/docs/content.zh/docs/deployment/config.md +++ b/docs/content.zh/docs/deployment/config.md @@ -562,7 +562,7 @@ These options are for the network stack that handles the streaming and batch dat Flink uses Pekko for RPC between components (JobManager/TaskManager/ResourceManager). Flink does not use Pekko for data transport. -{{< generated/akka_configuration >}} +{{< generated/rpc_configuration >}} diff --git a/docs/content/docs/deployment/config.md b/docs/content/docs/deployment/config.md index f7927e260b4..cc903eb34f2 100644 --- a/docs/content/docs/deployment/config.md +++ b/docs/content/docs/deployment/config.md @@ -564,7 +564,7 @@ These options are for the network stack that handles the streaming and batch dat Flink uses Pekko for RPC between components (JobManager/TaskManager/ResourceManager). Flink does not use Pekko for data transport. -{{< generated/akka_configuration >}} +{{< generated/rpc_configuration >}} diff --git a/docs/layouts/shortcodes/generated/akka_configuration.html b/docs/layouts/shortcodes/generated/rpc_configuration.html similarity index 100% rename from docs/layouts/shortcodes/generated/akka_configuration.html rename to docs/layouts/shortcodes/generated/rpc_configuration.html
(flink) branch master updated: [FLINK-28440][State/changelog] Increase the materialization Interval of ChangelogRecoveryITCase#testMaterialization
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 10522ff67ec [FLINK-28440][State/changelog] Increase the materialization Interval of ChangelogRecoveryITCase#testMaterialization 10522ff67ec is described below commit 10522ff67ec1b120374deb181ddaf4a4ee7bffb9 Author: fredia AuthorDate: Tue Apr 9 16:17:58 2024 +0800 [FLINK-28440][State/changelog] Increase the materialization Interval of ChangelogRecoveryITCase#testMaterialization --- .../org/apache/flink/test/checkpointing/ChangelogRecoveryITCase.java| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoveryITCase.java index e554ab75f96..b5976633d4e 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoveryITCase.java @@ -92,7 +92,7 @@ public class ChangelogRecoveryITCase extends ChangelogRecoveryITCaseBase { SharedReference> currentMaterializationId = sharedObjects.add(ConcurrentHashMap.newKeySet()); StreamExecutionEnvironment env = -getEnv(delegatedStateBackend, checkpointFolder, 100, 2, 50, 0); +getEnv(delegatedStateBackend, checkpointFolder, 100, 2, 200, 0); waitAndAssert( buildJobGraph( env,
(flink) branch master updated: [FLINK-34668][checkpoint] Report operator state handle of file merging directory to JM
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 05b27be4112 [FLINK-34668][checkpoint] Report operator state handle of file merging directory to JM 05b27be4112 is described below commit 05b27be4112e1f5ca4c5614b1714a202c13343b6 Author: fredia AuthorDate: Mon Mar 18 14:05:52 2024 +0800 [FLINK-34668][checkpoint] Report operator state handle of file merging directory to JM --- .../filemerging/FileMergingSnapshotManager.java| 12 ++ .../FileMergingSnapshotManagerBase.java| 32 + ...efaultOperatorStateBackendSnapshotStrategy.java | 27 +++- .../filemerging/DirectoryStreamStateHandle.java| 90 .../EmptyFileMergingOperatorStreamStateHandle.java | 64 + .../filemerging/EmptySegmentFileStateHandle.java | 46 ++ .../FileMergingOperatorStreamStateHandle.java | 154 + .../filemerging/SegmentFileStateHandle.java| 5 +- .../FileMergingCheckpointStateOutputStream.java| 2 +- .../FsMergingCheckpointStorageLocation.java| 11 ++ ...ssCheckpointFileMergingSnapshotManagerTest.java | 1 + .../FileMergingSnapshotManagerTestBase.java| 1 + ...inCheckpointFileMergingSnapshotManagerTest.java | 1 + .../runtime/state/OperatorStateBackendTest.java| 85 .../runtime/state/SharedStateRegistryTest.java | 86 ...FileMergingCheckpointStateOutputStreamTest.java | 2 +- .../FsMergingCheckpointStorageLocationTest.java| 2 +- 17 files changed, 614 insertions(+), 7 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java index cc54854a7a3..afc5eb618dc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.CheckpointedStateScope; import org.apache.flink.runtime.state.TaskStateManager; +import org.apache.flink.runtime.state.filemerging.DirectoryStreamStateHandle; import org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream; import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageAccess; @@ -117,6 +118,17 @@ public interface FileMergingSnapshotManager extends Closeable { */ Path getManagedDir(SubtaskKey subtaskKey, CheckpointedStateScope scope); +/** + * Get the {@link DirectoryStreamStateHandle} of the managed directory, created in {@link + * #initFileSystem} or {@link #registerSubtaskForSharedStates}. + * + * @param subtaskKey the subtask key identifying the subtask. + * @param scope the checkpoint scope. + * @return the {@link DirectoryStreamStateHandle} for one subtask in specified checkpoint scope. + */ +DirectoryStreamStateHandle getManagedDirStateHandle( +SubtaskKey subtaskKey, CheckpointedStateScope scope); + /** * Notifies the manager that the checkpoint with the given {@code checkpointId} completed and * was committed. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java index 3dc9c788561..d46edcd235e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java @@ -27,6 +27,8 @@ import org.apache.flink.core.fs.OutputStreamAndPath; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.checkpoint.filemerging.LogicalFile.LogicalFileId; import org.apache.flink.runtime.state.CheckpointedStateScope; +import org.apache.flink.runtime.state.filemerging.DirectoryStreamStateHandle; +import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle; import org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; @@ -37,6 +39,7 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; import javax.annotation.concurrent.GuardedBy; +import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.util.HashSet; @@ -105,12 +108,24 @@ public abstract class
(flink) branch master updated (f7593524579 -> 501de487de9)
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from f7593524579 [FLINK-34228] Add long UTF serializer/deserializer add 501de487de9 [FLINK-34667][state/changelog] Changelog state backend support local rescaling No new revisions were added by this update. Summary of changes: .../inmemory/InMemoryStateChangelogWriter.java | 4 +++- .../state/changelog/ChangelogKeyedStateBackend.java | 21 - .../test/checkpointing/AutoRescalingITCase.java | 3 --- 3 files changed, 19 insertions(+), 9 deletions(-)
(flink) 01/03: [FLINK-34484][state]Move LOCAL_RECOVERY config from CheckpointingOptions to StateRecoveryOption
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 1d1dca6d6b1a80c4fcd3337489a99c0d834692ea Author: Jinzhong Li AuthorDate: Tue Feb 27 18:08:03 2024 +0800 [FLINK-34484][state]Move LOCAL_RECOVERY config from CheckpointingOptions to StateRecoveryOption --- .../generated/checkpointing_configuration.html | 6 -- .../generated/common_state_backends_section.html| 6 -- .../generated/state_recovery_configuration.html | 6 ++ .../flink/configuration/CheckpointingOptions.java | 4 .../flink/configuration/StateRecoveryOptions.java | 17 + .../flink/streaming/tests/AllroundMiniClusterTest.java | 4 ++-- .../DefaultSlotPoolServiceSchedulerFactory.java | 3 ++- .../scheduler/ExecutionSlotAllocationContext.java | 3 +-- .../taskexecutor/TaskManagerServicesConfiguration.java | 3 ++- .../flink/runtime/util/SlotSelectionStrategyUtils.java | 4 ++-- .../DefaultSlotPoolServiceSchedulerFactoryTest.java | 4 ++-- .../state/TaskExecutorLocalStateStoresManagerTest.java | 3 ++- .../runtime/taskexecutor/TaskExecutorRecoveryTest.java | 4 ++-- .../runtime/util/SlotSelectionStrategyUtilsTest.java| 6 +++--- .../flink/test/checkpointing/AutoRescalingITCase.java | 3 ++- .../checkpointing/ChangelogLocalRecoveryITCase.java | 2 +- .../flink/test/checkpointing/LocalRecoveryITCase.java | 4 ++-- .../checkpointing/NotifyCheckpointAbortedITCase.java| 4 ++-- .../checkpointing/ResumeCheckpointManuallyITCase.java | 3 ++- .../apache/flink/test/recovery/LocalRecoveryITCase.java | 4 ++-- .../runtime/DefaultSchedulerLocalRecoveryITCase.java| 4 ++-- .../org/apache/flink/test/runtime/SchedulingITCase.java | 6 +++--- .../test/state/ChangelogRecoveryCachingITCase.java | 2 +- .../flink/test/state/ChangelogRescalingITCase.java | 2 +- 24 files changed, 63 insertions(+), 44 deletions(-) diff --git a/docs/layouts/shortcodes/generated/checkpointing_configuration.html b/docs/layouts/shortcodes/generated/checkpointing_configuration.html index 2c40c7e1804..0cd3dd53755 100644 --- a/docs/layouts/shortcodes/generated/checkpointing_configuration.html +++ b/docs/layouts/shortcodes/generated/checkpointing_configuration.html @@ -14,12 +14,6 @@ Boolean Option whether the state backend should create incremental checkpoints, if possible. For an incremental checkpoint, only a diff from the previous checkpoint is stored, rather than the complete checkpoint state. Once enabled, the state size shown in web UI or fetched from rest API only represents the delta checkpoint size instead of full checkpoint size. Some state backends may not support incremental checkpoints and ignore this option. - -state.backend.local-recovery -false -Boolean -This option configures local recovery for this state backend. By default, local recovery is deactivated. Local recovery currently only covers keyed state backends (including both the EmbeddedRocksDBStateBackend and the HashMapStateBackend). - state.checkpoint-storage (none) diff --git a/docs/layouts/shortcodes/generated/common_state_backends_section.html b/docs/layouts/shortcodes/generated/common_state_backends_section.html index 7d35697849c..a16650bf412 100644 --- a/docs/layouts/shortcodes/generated/common_state_backends_section.html +++ b/docs/layouts/shortcodes/generated/common_state_backends_section.html @@ -38,12 +38,6 @@ Boolean Option whether the state backend should create incremental checkpoints, if possible. For an incremental checkpoint, only a diff from the previous checkpoint is stored, rather than the complete checkpoint state. Once enabled, the state size shown in web UI or fetched from rest API only represents the delta checkpoint size instead of full checkpoint size. Some state backends may not support incremental checkpoints and ignore this option. - -state.backend.local-recovery -false -Boolean -This option configures local recovery for this state backend. By default, local recovery is deactivated. Local recovery currently only covers keyed state backends (including both the EmbeddedRocksDBStateBackend and the HashMapStateBackend). - state.checkpoint.cleaner.parallel-mode true diff --git a/docs/layouts/shortcodes/generated/state_recovery_configuration.html b/docs/layouts/shortcodes/generated/state_recovery_configuration.html index 5c255b2f820..2a1d914e2fd 100644 --- a/docs/layouts/shortcodes/generated/state_recovery_configuration.html +++ b/docs/layouts/shortcodes/generated/state_recovery_configuration.html @@ -14,6 +14,12
(flink) 02/03: [FLINK-34484][state] Split 'state.backend.local-recovery' into two options for checkpointing and recovery
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit ed75795e97800177cb67141ab838632d5ec55bb5 Author: Jinzhong Li AuthorDate: Wed Feb 28 12:19:36 2024 +0800 [FLINK-34484][state] Split 'state.backend.local-recovery' into two options for checkpointing and recovery --- .../generated/checkpointing_configuration.html | 6 .../generated/common_state_backends_section.html | 6 .../generated/state_recovery_configuration.html| 2 +- .../flink/configuration/CheckpointingOptions.java | 24 +- .../flink/configuration/StateRecoveryOptions.java | 16 + .../fs/DuplicatingStateChangeFsUploader.java | 10 +++--- .../changelog/fs/FsStateChangelogStorage.java | 2 +- .../flink/changelog/fs/FsStateChangelogWriter.java | 2 +- .../changelog/fs/StateChangeUploadScheduler.java | 2 +- .../api/runtime/SavepointTaskStateManager.java | 2 +- .../DefaultSlotPoolServiceSchedulerFactory.java| 1 - .../state/ChangelogTaskLocalStateStore.java| 2 +- .../state/CheckpointStreamWithResultProvider.java | 2 +- .../flink/runtime/state/LocalRecoveryConfig.java | 38 +++--- ...er.java => LocalSnapshotDirectoryProvider.java} | 14 ...ava => LocalSnapshotDirectoryProviderImpl.java} | 10 +++--- .../state/TaskExecutorLocalStateStoresManager.java | 21 +++- .../runtime/state/TaskLocalStateStoreImpl.java | 14 +++- .../runtime/state/heap/HeapSnapshotStrategy.java | 2 +- .../runtime/taskexecutor/TaskManagerServices.java | 1 + .../TaskManagerServicesConfiguration.java | 14 ++-- .../state/ChangelogTaskLocalStateStoreTest.java| 15 + .../CheckpointStreamWithResultProviderTest.java| 8 ++--- ...=> LocalSnapshotDirectoryProviderImplTest.java} | 10 +++--- .../TaskExecutorLocalStateStoresManagerTest.java | 2 +- .../runtime/state/TaskLocalStateStoreImplTest.java | 7 ++-- .../runtime/state/TaskStateManagerImplTest.java| 7 ++-- .../runtime/state/TestLocalRecoveryConfig.java | 6 ++-- .../state/changelog/ChangelogStateDiscardTest.java | 2 +- .../snapshot/RocksDBSnapshotStrategyBase.java | 8 ++--- .../runtime/tasks/LocalStateForwardingTest.java| 9 ++--- .../runtime/tasks/StreamTaskTestHarness.java | 6 ++-- .../benchmark/StateBackendBenchmarkUtils.java | 4 +-- .../restore/StreamOperatorSnapshotRestoreTest.java | 13 +--- 34 files changed, 194 insertions(+), 94 deletions(-) diff --git a/docs/layouts/shortcodes/generated/checkpointing_configuration.html b/docs/layouts/shortcodes/generated/checkpointing_configuration.html index 0cd3dd53755..c87a9c33803 100644 --- a/docs/layouts/shortcodes/generated/checkpointing_configuration.html +++ b/docs/layouts/shortcodes/generated/checkpointing_configuration.html @@ -8,6 +8,12 @@ + +execution.checkpointing.local-backup.enabled +false +Boolean +This option configures local backup for the state backend, which indicates whether to make backup checkpoint on local disk. If not configured, fallback to execution.state-recovery.from-local. By default, local backup is deactivated. Local backup currently only covers keyed state backends (including both the EmbeddedRocksDBStateBackend and the HashMapStateBackend). + state.backend.incremental false diff --git a/docs/layouts/shortcodes/generated/common_state_backends_section.html b/docs/layouts/shortcodes/generated/common_state_backends_section.html index a16650bf412..ab664f51bfb 100644 --- a/docs/layouts/shortcodes/generated/common_state_backends_section.html +++ b/docs/layouts/shortcodes/generated/common_state_backends_section.html @@ -32,6 +32,12 @@ String The default directory for savepoints. Used by the state backends that write savepoints to file systems (HashMapStateBackend, EmbeddedRocksDBStateBackend). + +execution.state-recovery.from-local +false +Boolean +This option configures local recovery for the state backend, which indicates whether to recovery from local snapshot.By default, local recovery is deactivated. Local recovery currently only covers keyed state backends (including both the EmbeddedRocksDBStateBackend and the HashMapStateBackend)." + state.backend.incremental false diff --git a/docs/layouts/shortcodes/generated/state_recovery_configuration.html b/docs/layouts/shortcodes/generated/state_recovery_configuration.html index 2a1d914e2fd..2df071ea4d5 100644 --- a/docs/layouts/shortcodes/generated/state_recovery_configuration.html +++ b/docs/layouts/shortcodes/generated/state_recovery_configuration.html @@ -18,7 +18,7 @@
(flink) 03/03: [FLINK-34484][tests] Add UT/IT tests to verify localBackup and localRecovery configs
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 649e2b412e10c7f698721cbbd8697f9b02248977 Author: Jinzhong Li AuthorDate: Wed Feb 28 18:44:30 2024 +0800 [FLINK-34484][tests] Add UT/IT tests to verify localBackup and localRecovery configs --- .../TaskExecutorLocalStateStoresManagerTest.java | 31 +-- .../runtime/state/TaskLocalStateStoreImplTest.java | 22 +++ ...cutorExecutionDeploymentReconciliationTest.java | 1 + .../TaskExecutorPartitionLifecycleTest.java| 1 + .../taskexecutor/TaskExecutorSlotLifetimeTest.java | 1 + .../runtime/taskexecutor/TaskExecutorTest.java | 2 + .../TaskSubmissionTestEnvironment.java | 1 + .../test/checkpointing/LocalRecoveryITCase.java| 46 ++ 8 files changed, 94 insertions(+), 11 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java index 31c2f406f30..513375f3cb7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java @@ -152,7 +152,7 @@ class TaskExecutorLocalStateStoresManagerTest { } @Test -void testLocalStateNoCreateDirWhenDisabledLocalRecovery() throws Exception { +void testLocalStateNoCreateDirWhenDisabledLocalBackupAndRecovery() throws Exception { JobID jobID = new JobID(); JobVertexID jobVertexID = new JobVertexID(); AllocationID allocationID = new AllocationID(); @@ -165,9 +165,11 @@ class TaskExecutorLocalStateStoresManagerTest { }; boolean localRecoveryEnabled = false; +boolean localBackupEnabled = false; TaskExecutorLocalStateStoresManager storesManager = new TaskExecutorLocalStateStoresManager( localRecoveryEnabled, +localBackupEnabled, Reference.owned(rootDirs), Executors.directExecutor()); @@ -196,6 +198,21 @@ class TaskExecutorLocalStateStoresManagerTest { */ @Test void testSubtaskStateStoreDirectoryCreateAndDelete() throws Exception { +testSubtaskStateStoreDirectoryCreateAndDelete(true, true); +} + +@Test +void testStateStoreDirectoryCreateAndDeleteWithLocalRecoveryEnabled() throws Exception { +testSubtaskStateStoreDirectoryCreateAndDelete(true, false); +} + +@Test +void testStateStoreDirectoryCreateAndDeleteWithLocalBackupEnabled() throws Exception { +testSubtaskStateStoreDirectoryCreateAndDelete(false, true); +} + +private void testSubtaskStateStoreDirectoryCreateAndDelete( +boolean localRecoveryEnabled, boolean localBackupEnabled) throws Exception { JobID jobID = new JobID(); JobVertexID jobVertexID = new JobVertexID(); @@ -209,7 +226,10 @@ class TaskExecutorLocalStateStoresManagerTest { }; TaskExecutorLocalStateStoresManager storesManager = new TaskExecutorLocalStateStoresManager( -true, Reference.owned(rootDirs), Executors.directExecutor()); +localRecoveryEnabled, +localBackupEnabled, +Reference.owned(rootDirs), +Executors.directExecutor()); TaskLocalStateStore taskLocalStateStore = storesManager.localStateStoreForSubtask( @@ -305,7 +325,10 @@ class TaskExecutorLocalStateStoresManagerTest { final TaskExecutorLocalStateStoresManager taskExecutorLocalStateStoresManager = new TaskExecutorLocalStateStoresManager( -true, Reference.owned(localStateDirectories), Executors.directExecutor()); +true, +true, +Reference.owned(localStateDirectories), +Executors.directExecutor()); for (File localStateDirectory : localStateDirectories) { assertThat(localStateDirectory).exists(); @@ -327,6 +350,7 @@ class TaskExecutorLocalStateStoresManagerTest { final TaskExecutorLocalStateStoresManager taskExecutorLocalStateStoresManager = new TaskExecutorLocalStateStoresManager( +true, true, Reference.borrowed(localStateDirectories), Executors.directExecutor()); @@ -348,6 +372,7 @@ class TaskExecutorLocalStateStoresManagerTest { final File localStateStore = TempDirUtils.newFolder(temporaryFolder.toPath()); final
(flink) branch master updated (3b9623e5d2e -> 649e2b412e1)
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 3b9623e5d2e [FLINK-32076][checkpoint] Introduce blocking file pool to reuse files (#24418) new 1d1dca6d6b1 [FLINK-34484][state]Move LOCAL_RECOVERY config from CheckpointingOptions to StateRecoveryOption new ed75795e978 [FLINK-34484][state] Split 'state.backend.local-recovery' into two options for checkpointing and recovery new 649e2b412e1 [FLINK-34484][tests] Add UT/IT tests to verify localBackup and localRecovery configs The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../generated/checkpointing_configuration.html | 8 ++-- .../generated/common_state_backends_section.html | 8 ++-- .../generated/state_recovery_configuration.html| 6 +++ .../flink/configuration/CheckpointingOptions.java | 26 .../flink/configuration/StateRecoveryOptions.java | 19 + .../fs/DuplicatingStateChangeFsUploader.java | 10 ++--- .../changelog/fs/FsStateChangelogStorage.java | 2 +- .../flink/changelog/fs/FsStateChangelogWriter.java | 2 +- .../changelog/fs/StateChangeUploadScheduler.java | 2 +- .../streaming/tests/AllroundMiniClusterTest.java | 4 +- .../api/runtime/SavepointTaskStateManager.java | 2 +- .../DefaultSlotPoolServiceSchedulerFactory.java| 4 +- .../scheduler/ExecutionSlotAllocationContext.java | 3 +- .../state/ChangelogTaskLocalStateStore.java| 2 +- .../state/CheckpointStreamWithResultProvider.java | 2 +- .../flink/runtime/state/LocalRecoveryConfig.java | 38 +++-- ...er.java => LocalSnapshotDirectoryProvider.java} | 14 +++ ...ava => LocalSnapshotDirectoryProviderImpl.java} | 10 ++--- .../state/TaskExecutorLocalStateStoresManager.java | 21 ++ .../runtime/state/TaskLocalStateStoreImpl.java | 14 ++- .../runtime/state/heap/HeapSnapshotStrategy.java | 2 +- .../runtime/taskexecutor/TaskManagerServices.java | 1 + .../TaskManagerServicesConfiguration.java | 15 ++- .../runtime/util/SlotSelectionStrategyUtils.java | 4 +- ...DefaultSlotPoolServiceSchedulerFactoryTest.java | 4 +- .../state/ChangelogTaskLocalStateStoreTest.java| 15 +++ .../CheckpointStreamWithResultProviderTest.java| 8 ++-- ...=> LocalSnapshotDirectoryProviderImplTest.java} | 10 ++--- .../TaskExecutorLocalStateStoresManagerTest.java | 36 +--- .../runtime/state/TaskLocalStateStoreImplTest.java | 29 +++-- .../runtime/state/TaskStateManagerImplTest.java| 7 ++-- .../runtime/state/TestLocalRecoveryConfig.java | 6 +-- ...cutorExecutionDeploymentReconciliationTest.java | 1 + .../TaskExecutorPartitionLifecycleTest.java| 1 + .../taskexecutor/TaskExecutorRecoveryTest.java | 4 +- .../taskexecutor/TaskExecutorSlotLifetimeTest.java | 1 + .../runtime/taskexecutor/TaskExecutorTest.java | 2 + .../TaskSubmissionTestEnvironment.java | 1 + .../util/SlotSelectionStrategyUtilsTest.java | 6 +-- .../state/changelog/ChangelogStateDiscardTest.java | 2 +- .../snapshot/RocksDBSnapshotStrategyBase.java | 8 ++-- .../runtime/tasks/LocalStateForwardingTest.java| 9 ++-- .../runtime/tasks/StreamTaskTestHarness.java | 6 +-- .../benchmark/StateBackendBenchmarkUtils.java | 4 +- .../test/checkpointing/AutoRescalingITCase.java| 3 +- .../ChangelogLocalRecoveryITCase.java | 2 +- .../test/checkpointing/LocalRecoveryITCase.java| 48 ++ .../NotifyCheckpointAbortedITCase.java | 4 +- .../ResumeCheckpointManuallyITCase.java| 3 +- .../flink/test/recovery/LocalRecoveryITCase.java | 4 +- .../DefaultSchedulerLocalRecoveryITCase.java | 4 +- .../flink/test/runtime/SchedulingITCase.java | 6 +-- .../test/state/ChangelogRecoveryCachingITCase.java | 2 +- .../flink/test/state/ChangelogRescalingITCase.java | 2 +- .../restore/StreamOperatorSnapshotRestoreTest.java | 13 +++--- 55 files changed, 336 insertions(+), 134 deletions(-) rename flink-runtime/src/main/java/org/apache/flink/runtime/state/{LocalRecoveryDirectoryProvider.java => LocalSnapshotDirectoryProvider.java} (83%) rename flink-runtime/src/main/java/org/apache/flink/runtime/state/{LocalRecoveryDirectoryProviderImpl.java => LocalSnapshotDirectoryProviderImpl.java} (93%) rename flink-runtime/src/test/java/org/apache/flink/runtime/state/{LocalRecoveryDirectoryProviderImplTest.java => LocalSnapshotDirectoryProviderImplTest.java} (93%)
(flink) branch master updated: [FLINK-34624][state/changelog] Enable local recovery in ChangelogRescalingITCase (#24470)
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 94b55d1ae61 [FLINK-34624][state/changelog] Enable local recovery in ChangelogRescalingITCase (#24470) 94b55d1ae61 is described below commit 94b55d1ae61257f21c7bb511660e7497f269abc7 Author: Yanfei Lei AuthorDate: Tue Mar 12 11:28:35 2024 +0800 [FLINK-34624][state/changelog] Enable local recovery in ChangelogRescalingITCase (#24470) --- .../streaming/util/TestStreamEnvironment.java | 57 ++ .../test/checkpointing/AutoRescalingITCase.java| 3 ++ .../flink/test/state/ChangelogRescalingITCase.java | 2 +- 3 files changed, 29 insertions(+), 33 deletions(-) diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java index e8f9f2578f6..58ddd379652 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java @@ -35,7 +35,6 @@ import java.time.Duration; import java.util.Collection; import java.util.Collections; -import static org.apache.flink.configuration.CheckpointingOptions.LOCAL_RECOVERY; import static org.apache.flink.runtime.testutils.PseudoRandomValueSelector.randomize; /** A {@link StreamExecutionEnvironment} that executes its jobs on {@link MiniCluster}. */ @@ -125,41 +124,35 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment { } // randomize ITTests for enabling state change log -if (isConfigurationSupportedByChangelog(miniCluster.getConfiguration())) { -if (STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_ON)) { -if (!conf.contains(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG)) { -conf.set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, true); -miniCluster.overrideRestoreModeForChangelogStateBackend(); -} -} else if (STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_RAND)) { -boolean enabled = -randomize(conf, StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, true, false); -if (enabled) { -// More situations about enabling periodic materialization should be tested -randomize( -conf, - StateChangelogOptions.PERIODIC_MATERIALIZATION_ENABLED, -true, -true, -true, -false); -randomize( -conf, - StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL, -Duration.ofMillis(100), -Duration.ofMillis(500), -Duration.ofSeconds(1), -Duration.ofSeconds(5)); -miniCluster.overrideRestoreModeForChangelogStateBackend(); -} +if (STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_ON)) { +if (!conf.contains(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG)) { +conf.set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, true); +miniCluster.overrideRestoreModeForChangelogStateBackend(); +} +} else if (STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_RAND)) { +boolean enabled = +randomize(conf, StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, true, false); +if (enabled) { +// More situations about enabling periodic materialization should be tested +randomize( +conf, +StateChangelogOptions.PERIODIC_MATERIALIZATION_ENABLED, +true, +true, +true, +false); +randomize( +conf, + StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL, +Duration.ofMillis(100), +Duration.ofMillis(500), +Duration.ofSeconds(1), +Duration.ofSeconds(5)); +miniCluster.overrideRestoreModeForChangelogStateBackend(); } } } -private static boolean isConfigurationSupportedByChangelog(Configuration configuration
(flink) branch master updated: [FLINK-34483][docs] Improve the documentation of 'state.checkpoints.dir' and 'state.checkpoint-storage' (#24401)
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 1d8cbe9065e [FLINK-34483][docs] Improve the documentation of 'state.checkpoints.dir' and 'state.checkpoint-storage' (#24401) 1d8cbe9065e is described below commit 1d8cbe9065e2ef34a748513b2469ecfe3626639e Author: Yanfei Lei AuthorDate: Fri Mar 8 17:28:11 2024 +0800 [FLINK-34483][docs] Improve the documentation of 'state.checkpoints.dir' and 'state.checkpoint-storage' (#24401) --- .../generated/checkpointing_configuration.html | 4 ++-- .../generated/common_state_backends_section.html | 4 ++-- .../flink/configuration/CheckpointingOptions.java | 27 -- 3 files changed, 29 insertions(+), 6 deletions(-) diff --git a/docs/layouts/shortcodes/generated/checkpointing_configuration.html b/docs/layouts/shortcodes/generated/checkpointing_configuration.html index 03979d7d6c9..2c40c7e1804 100644 --- a/docs/layouts/shortcodes/generated/checkpointing_configuration.html +++ b/docs/layouts/shortcodes/generated/checkpointing_configuration.html @@ -24,7 +24,7 @@ state.checkpoint-storage (none) String -The checkpoint storage implementation to be used to checkpoint state.The implementation can be specified either via their shortcut name, or via the class name of a CheckpointStorageFactory. If a factory is specified it is instantiated via its zero argument constructor and its CheckpointStorageFactory#createFromConfig(ReadableConfig, ClassLoader) method is called.Recognized shortcut [...] +The checkpoint storage implementation to be used to checkpoint state.The implementation can be specified either via their shortcut name, or via the class name of a CheckpointStorageFactory. If a factory is specified it is instantiated via its zero argument constructor and its CheckpointStorageFactory#createFromConfig(ReadableConfig, ClassLoader) method is called.Recognized shortcut [...] state.checkpoint.cleaner.parallel-mode @@ -42,7 +42,7 @@ state.checkpoints.dir (none) String -The default directory used for storing the data files and meta data of checkpoints in a Flink supported filesystem. The storage path must be accessible from all participating processes/nodes(i.e. all TaskManagers and JobManagers). +The default directory used for storing the data files and meta data of checkpoints in a Flink supported filesystem. The storage path must be accessible from all participating processes/nodes(i.e. all TaskManagers and JobManagers). If the 'state.checkpoint-storage' is set to 'jobmanager', only the meta data of checkpoints will be stored in this directory. state.checkpoints.num-retained diff --git a/docs/layouts/shortcodes/generated/common_state_backends_section.html b/docs/layouts/shortcodes/generated/common_state_backends_section.html index 157cb75c81f..7d35697849c 100644 --- a/docs/layouts/shortcodes/generated/common_state_backends_section.html +++ b/docs/layouts/shortcodes/generated/common_state_backends_section.html @@ -18,13 +18,13 @@ state.checkpoint-storage (none) String -The checkpoint storage implementation to be used to checkpoint state.The implementation can be specified either via their shortcut name, or via the class name of a CheckpointStorageFactory. If a factory is specified it is instantiated via its zero argument constructor and its CheckpointStorageFactory#createFromConfig(ReadableConfig, ClassLoader) method is called.Recognized shortcut [...] +The checkpoint storage implementation to be used to checkpoint state.The implementation can be specified either via their shortcut name, or via the class name of a CheckpointStorageFactory. If a factory is specified it is instantiated via its zero argument constructor and its CheckpointStorageFactory#createFromConfig(ReadableConfig, ClassLoader) method is called.Recognized shortcut [...] state.checkpoints.dir (none) String -The default directory used for storing the data files and meta data of checkpoints in a Flink supported filesystem. The storage path must be accessible from all participating processes/nodes(i.e. all TaskManagers and JobManagers). +The default directory used for storing the data files and meta data of checkpoints in a Flink supported filesystem. The storage path must be accessible from all participating processes/nodes(i.e. all TaskManagers and JobManagers). If the 'state.checkpoint-storage' is set to 'jobma
(flink) branch master updated (e7e973e212d -> 80090c76f88)
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from e7e973e212d [FLINK-34458][checkpointing] Rename options for Generalized incremental checkpoints (changelog) (#24324) add 80090c76f88 [FLINK-34455] Move RestoreMode from flink-runtime to flink-core (#24320) No new revisions were added by this update. Summary of changes: .../src/main/java/org/apache/flink/client/cli/CliFrontendParser.java| 2 +- .../src/main/java/org/apache/flink/client/cli/ProgramOptions.java | 2 +- .../src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java | 2 +- .../container/entrypoint/StandaloneApplicationClusterEntryPoint.java| 2 +- .../src/main/java/org/apache/flink/core/execution}/RestoreMode.java | 2 +- .../highavailability/KubernetesCheckpointRecoveryFactory.java | 2 +- .../main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java| 2 +- .../org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java | 2 +- .../org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java | 2 +- .../flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java | 2 +- .../apache/flink/runtime/webmonitor/handlers/JarRunRequestBodyTest.java | 2 +- .../java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java | 2 +- .../org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java | 2 +- .../java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java | 2 +- .../flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java | 2 +- .../flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java | 2 +- .../flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java | 2 +- .../flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java| 2 +- .../flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java| 2 +- .../runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java| 2 +- .../java/org/apache/flink/runtime/jobgraph/SavepointConfigOptions.java | 1 + .../org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java | 1 + .../src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java | 2 +- .../main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java| 2 +- .../main/java/org/apache/flink/runtime/state/SharedStateRegistry.java | 2 +- .../java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java | 2 +- .../java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java| 2 +- .../src/main/java/org/apache/flink/runtime/state/StateBackend.java | 2 +- .../src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java | 2 +- .../flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java | 2 +- .../flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java| 2 +- .../org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java | 2 +- .../flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java | 2 +- .../org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java| 2 +- .../flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java | 2 +- .../apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryTest.java | 2 +- .../flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java | 2 +- .../runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java | 2 +- .../flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java | 2 +- .../org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java| 2 +- .../dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java| 2 +- .../org/apache/flink/runtime/jobgraph/SavepointRestoreSettingsTest.java | 2 ++ .../java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java | 2 +- .../java/org/apache/flink/runtime/state/SharedStateRegistryTest.java| 2 +- .../main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java | 2 +- .../flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java | 2 +- .../test/checkpointing/ChangelogRecoverySwitchStateBackendITCase.java | 2 +- .../apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java | 2 +- .../java/org/apache/flink/test/checkpointing/SavepointFormatITCase.java | 2 +- .../test/java/org/apache/flink/test/checkpointing/SavepointITCase.java | 2 +- 50 files changed, 51 insertions(+), 47 deletions(-) rename {flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph => flink-core/src/main/java/org/apache/flink/core/execution}/RestoreMode.java (98%)
(flink) branch master updated (6f7e841dd4b -> 26817092445)
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 6f7e841dd4b [FLINK-34479][documentation] Fix missed changelog configs in the documentation (#24354) add 26817092445 [FLINK-32443][docs-zh] Translate "State Processor API" page into Chinese (#23496) No new revisions were added by this update. Summary of changes: docs/content.zh/docs/libs/state_processor_api.md | 168 ++- 1 file changed, 74 insertions(+), 94 deletions(-)
(flink) branch master updated: [FLINK-33863] Fix restoring compressed operator state (#23938)
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new d415d93bbf9 [FLINK-33863] Fix restoring compressed operator state (#23938) d415d93bbf9 is described below commit d415d93bbf9620ba985136469107edd8c6e31cc6 Author: Ruibin Xing AuthorDate: Wed Dec 20 18:04:44 2023 +0800 [FLINK-33863] Fix restoring compressed operator state (#23938) --- .../state/OperatorStateRestoreOperation.java | 30 +- .../state/OperatorStateRestoreOperationTest.java | 114 + 2 files changed, 142 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java index f818eb81978..fd983fd5d28 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java @@ -32,9 +32,12 @@ import org.apache.commons.io.IOUtils; import javax.annotation.Nonnull; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; +import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** Implementation of operator state restore operation. */ public class OperatorStateRestoreOperation implements RestoreOperation { @@ -168,9 +171,32 @@ public class OperatorStateRestoreOperation implements RestoreOperation { } } +List> entries = +new ArrayList<>(stateHandle.getStateNameToPartitionOffsets().entrySet()); + +if (backendSerializationProxy.isUsingStateCompression()) { +// sort state handles by offsets to avoid building SnappyFramedInputStream with +// EOF stream. +entries = +entries.stream() +.sorted( +Comparator.comparingLong( +entry -> { + OperatorStateHandle.StateMetaInfo +stateMetaInfo = entry.getValue(); +long[] offsets = stateMetaInfo.getOffsets(); +if (offsets == null +|| offsets.length == 0) { +return Long.MIN_VALUE; +} else { +return offsets[0]; +} +})) +.collect(Collectors.toList()); +} + // Restore all the states -for (Map.Entry nameToOffsets : - stateHandle.getStateNameToPartitionOffsets().entrySet()) { +for (Map.Entry nameToOffsets : entries) { final String stateName = nameToOffsets.getKey(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateRestoreOperationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateRestoreOperationTest.java new file mode 100644 index 000..e0aecd5d723 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateRestoreOperationTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.comm
(flink-benchmarks) branch master updated: [FLINK-33482] Flink benchmark regression check in the machines hosted on Aliyun (#81)
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git The following commit(s) were added to refs/heads/master by this push: new a6fe13c [FLINK-33482] Flink benchmark regression check in the machines hosted on Aliyun (#81) a6fe13c is described below commit a6fe13cc55499f4f26516aebbd8beb71e94cec88 Author: Zakelly AuthorDate: Wed Nov 15 12:37:50 2023 +0800 [FLINK-33482] Flink benchmark regression check in the machines hosted on Aliyun (#81) --- jenkinsfiles/regression-check.jenkinsfile | 4 ++-- regression_report.py | 36 +-- regression_report_v2.py | 18 ++-- 3 files changed, 47 insertions(+), 11 deletions(-) diff --git a/jenkinsfiles/regression-check.jenkinsfile b/jenkinsfiles/regression-check.jenkinsfile index b769a1f..1cbde9f 100644 --- a/jenkinsfiles/regression-check.jenkinsfile +++ b/jenkinsfiles/regression-check.jenkinsfile @@ -18,10 +18,10 @@ timestamps { try { timeout(time: 3, unit: 'HOURS') { // includes waiting for a machine -node('Hetzner') { +node('Aliyun') { dir('flink-benchmarks') { git url: 'https://github.com/apache/flink-benchmarks.git', branch: 'master' -sh './regression_report_v2.py > regression-report' +sh 'python2 ./regression_report_v2.py > regression-report' def alerts = readFile "regression-report" if (alerts) { def attachments = [ diff --git a/regression_report.py b/regression_report.py index fdbdcd7..1ddcedd 100755 --- a/regression_report.py +++ b/regression_report.py @@ -25,8 +25,9 @@ import argparse import json import re -DEFAULT_CODESPEED_URL = 'http://codespeed.dak8s.net:8000/' -ENVIRONMENT = 2 +DEFAULT_CODESPEED_URL = 'http://flink-speed.xyz/' +ENVIRONMENT = 3 +ENVNAME='Aliyun' current_date = datetime.datetime.today() @@ -69,6 +70,37 @@ def loadExecutableAndRevisions(codespeedUrl): revisions[exe] = rev return revisions +""" +Returns a dict executable id -> executable name +""" +def loadExecutableNames(codespeedUrl): +names = {} +url = codespeedUrl + 'reports' +f = urllib2.urlopen(url) +response = f.read() +f.close() +for line in response.split('\n'): +# Find urls like: /changes/?rev=b8e7fc387dd-ffcdbb4-1647231150&exe=1&env=Hetzner +# and extract rev and exe params out of it +reports = dict(re.findall(r'([a-z]+)=([a-z0-9\-]+)', line)) +if "exe" in reports and "rev" in reports: +exe = reports["exe"] +name = re.findall('([A-Za-z0-9\-\ \(\)]+)\@' + ENVNAME + '\<\/td\>', line) +# remember only the first (latest) revision for the given executable +if exe not in names and len(name) > 0: +names[exe] = name[0] +return names + +""" +Returns the Java version from the executable name +""" +def extractJavaVersion(name): +result = re.findall('(\Java[0-9]+)', name) +if len(result) > 0: +return result[0] +else: +return "Java8" + """ Returns a dict executable -> benchmark names """ diff --git a/regression_report_v2.py b/regression_report_v2.py index f0389f4..9ce50f5 100755 --- a/regression_report_v2.py +++ b/regression_report_v2.py @@ -22,7 +22,11 @@ import json import urllib import urllib2 +from regression_report import DEFAULT_CODESPEED_URL +from regression_report import ENVIRONMENT from regression_report import loadBenchmarkNames +from regression_report import loadExecutableNames +from regression_report import extractJavaVersion """ The regression detection algorithm calculates the regression ratio as the ratio of change between the current @@ -32,7 +36,6 @@ triggered if the regression ratio exceeds max(minRegressionRatio, minInstability Please refer to https://docs.google.com/document/d/1Bvzvq79Ll5yxd1UtC0YzczgFbZPAgPcN3cI0MjVkIag for more detail. """ -ENVIRONMENT = 2 MIN_SAMPLE_SIZE_LIMIT = 5 """ @@ -50,16 +53,16 @@ def loadHistoryData(codespeedUrl, exe, benchmark, baselineSize): return result, lessIsbBetter def detectRegression(urlToBenchmark, stds, scores, baselineSize, minRegressionRatio, minInstabilityMultiplier, - direction): + direction, execName): sustainable_x = [min(scores[i - 3: i]) for i in range(3, min(len(scores), bas
[flink-benchmarks] branch master updated (3013320 -> 60e3522)
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git from 3013320 Bump Flink verion to 1.19-SNAPSHOT add 60e3522 [FLINK-33161] Java17 profile for benchmarking (#80) No new revisions were added by this update. Summary of changes: README.md | 6 + pom.xml | 87 +++ 2 files changed, 93 insertions(+)
[flink] branch master updated: [FLINK-32072][checkpoint] Create and wire FileMergingSnapshotManager with TaskManagerServices (#22890)
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 1112582fd13 [FLINK-32072][checkpoint] Create and wire FileMergingSnapshotManager with TaskManagerServices (#22890) 1112582fd13 is described below commit 1112582fd136df47c6d356d6f6ad3946ad1e56d5 Author: Yanfei Lei <18653940+fre...@users.noreply.github.com> AuthorDate: Tue Sep 19 17:31:04 2023 +0800 [FLINK-32072][checkpoint] Create and wire FileMergingSnapshotManager with TaskManagerServices (#22890) --- .../api/runtime/SavepointTaskStateManager.java | 7 + .../filemerging/FileMergingSnapshotManager.java| 6 +- .../FileMergingSnapshotManagerBuilder.java | 2 - .../runtime/state/CheckpointStorageWorkerView.java | 13 ++ .../state/TaskExecutorFileMergingManager.java | 143 + .../flink/runtime/state/TaskStateManager.java | 3 + .../flink/runtime/state/TaskStateManagerImpl.java | 14 ++ .../filesystem/FsCheckpointStorageAccess.java | 24 +++- .../FsMergingCheckpointStorageAccess.java | 73 +++ .../flink/runtime/taskexecutor/TaskExecutor.java | 14 ++ .../runtime/taskexecutor/TaskManagerServices.java | 13 ++ .../state/TaskExecutorFileMergingManagerTest.java | 87 + .../runtime/state/TaskStateManagerImplTest.java| 4 + .../flink/runtime/state/TestTaskStateManager.java | 7 + .../taskexecutor/TaskManagerServicesBuilder.java | 10 ++ .../runtime/util/JvmExitOnFatalErrorTest.java | 1 + .../flink/streaming/runtime/tasks/StreamTask.java | 29 - .../StateInitializationContextImplTest.java| 1 + .../StreamTaskStateInitializerImplTest.java| 1 + .../runtime/tasks/LocalStateForwardingTest.java| 1 + .../streaming/runtime/tasks/StreamTaskTest.java| 2 + 21 files changed, 446 insertions(+), 9 deletions(-) diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java index cb6193e7724..a1d649d4d8c 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor; import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReader; +import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.LocalRecoveryConfig; import org.apache.flink.runtime.state.TaskStateManager; @@ -114,6 +115,12 @@ final class SavepointTaskStateManager implements TaskStateManager { return null; } +@Nullable +@Override +public FileMergingSnapshotManager getFileMergingSnapshotManager() { +return null; +} + @Override public void notifyCheckpointComplete(long checkpointId) { throw new UnsupportedOperationException(MSG); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java index 4954d8548f3..968e86f91c5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java @@ -17,6 +17,7 @@ package org.apache.flink.runtime.checkpoint.filemerging; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.TaskInfo; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; @@ -31,7 +32,7 @@ import java.io.Closeable; * FileMergingSnapshotManager provides an interface to manage files and meta information for * checkpoint files with merging checkpoint files enabled. It manages the files for ONE single task * in TM, including all subtasks of this single task that running in this TM. There is one - * FileMergingSnapshotManager for each task per task manager. + * FileMergingSnapshotManager for each job per task manager. * * TODO (FLINK-32073): create output stream. * @@ -125,7 +126,8 @@ public interface FileMergingSnapshotManager extends Closeable { taskInfo.getNumberOfParallelSu
[flink] branch master updated: [FLINK-32769][hotfix][state-changelog] Fix the invalid placeholder {} passed to MailboxExecutor#execute() in PeriodicMaterializationManager (#23153)
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 13469792821 [FLINK-32769][hotfix][state-changelog] Fix the invalid placeholder {} passed to MailboxExecutor#execute() in PeriodicMaterializationManager (#23153) 13469792821 is described below commit 13469792821ee1e9d2171a28a071cc986626316e Author: Wang FeiFan AuthorDate: Fri Sep 15 16:50:58 2023 +0800 [FLINK-32769][hotfix][state-changelog] Fix the invalid placeholder {} passed to MailboxExecutor#execute() in PeriodicMaterializationManager (#23153) Co-authored-by: Yanfei Lei <18653940+fre...@users.noreply.github.com> --- .../org/apache/flink/state/common/PeriodicMaterializationManager.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/PeriodicMaterializationManager.java b/flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/PeriodicMaterializationManager.java index 68bee356208..7989ea31cfb 100644 --- a/flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/PeriodicMaterializationManager.java +++ b/flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/PeriodicMaterializationManager.java @@ -267,7 +267,7 @@ public class PeriodicMaterializationManager implements Closeable { metrics.reportFailedMaterialization(); } }, -"Task {} update materializedSnapshot up to changelog sequence number: {}", +"Task %s update materializedSnapshot up to changelog sequence number: %s.", subtaskName, upTo); @@ -310,7 +310,7 @@ public class PeriodicMaterializationManager implements Closeable { () -> target.handleMaterializationFailureOrCancellation( materializationId, upTo, cause), -"Task {} materialization:{},upTo:{} failed or canceled.", +"Task %s materialization: %d, upTo: %s, failed or canceled.", subtaskName, materializationId, upTo);
[flink] branch master updated: [FLINK-30863][state/changelog] Register local recovery files of changelog before notifyCheckpointComplete()
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new c23a3002b83 [FLINK-30863][state/changelog] Register local recovery files of changelog before notifyCheckpointComplete() c23a3002b83 is described below commit c23a3002b837ecffa2d561a405c17c979c814c6e Author: Yanfei Lei <18653940+fre...@users.noreply.github.com> AuthorDate: Tue Aug 29 16:37:16 2023 +0800 [FLINK-30863][state/changelog] Register local recovery files of changelog before notifyCheckpointComplete() This closes https://github.com/apache/flink/pull/21822 --- .../fs/DuplicatingStateChangeFsUploader.java | 12 +- .../changelog/fs/FsStateChangelogStorage.java | 2 + .../flink/changelog/fs/FsStateChangelogWriter.java | 48 ++-- .../changelog/fs/ChangelogStorageMetricsTest.java | 16 +- .../fs/DiscardRecordableStateChangeUploader.java | 3 +- .../fs/FsStateChangelogWriterSqnTest.java | 2 +- .../changelog/fs/FsStateChangelogWriterTest.java | 263 +++-- .../state/changelog/LocalChangelogRegistry.java| 15 +- .../changelog/LocalChangelogRegistryImpl.java | 26 +- .../state/changelog/StateChangelogWriter.java | 11 +- .../inmemory/InMemoryStateChangelogWriter.java | 5 +- .../runtime/state/TestLocalRecoveryConfig.java | 4 + .../changelog/LocalChangelogRegistryTest.java | 2 +- .../inmemory/StateChangelogStorageTest.java| 2 +- .../changelog/ChangelogKeyedStateBackend.java | 2 +- .../state/changelog/ChangelogTruncateHelper.java | 1 - .../state/changelog/StateChangeLoggerTestBase.java | 6 +- 17 files changed, 319 insertions(+), 101 deletions(-) diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.java index cf99ed17683..50db749b973 100644 --- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.java @@ -26,6 +26,7 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.ChangelogTaskLocalStateStore; import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider; import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.changelog.LocalChangelogRegistry; import org.apache.flink.runtime.state.filesystem.FileStateHandle; import org.slf4j.Logger; @@ -51,14 +52,13 @@ import static org.apache.flink.runtime.state.ChangelogTaskLocalStateStore.getLoc * Store the meta of files into {@link ChangelogTaskLocalStateStore} by * AsyncCheckpointRunnable#reportCompletedSnapshotStates(). * Pass control of the file to {@link LocalChangelogRegistry#register} when - * ChangelogKeyedStateBackend#notifyCheckpointComplete() , files of the previous - * checkpoint will be deleted by {@link LocalChangelogRegistry#discardUpToCheckpoint} at - * the same time. + * FsStateChangelogWriter#persist , files of the previous checkpoint will be deleted by + * {@link LocalChangelogRegistry#discardUpToCheckpoint} when the checkpoint is confirmed. * When ChangelogTruncateHelper#materialized() or * ChangelogTruncateHelper#checkpointSubsumed() is called, {@link - * TaskChangelogRegistry#notUsed} is responsible for deleting local files. - * When one checkpoint is aborted, the dstl files of this checkpoint will be deleted by - * {@link LocalChangelogRegistry#prune} in {@link FsStateChangelogWriter#reset}. + * TaskChangelogRegistry#release} is responsible for deleting local files. + * When one checkpoint is aborted, all accumulated local dstl files will be deleted at + * once. * */ public class DuplicatingStateChangeFsUploader extends AbstractStateChangeFsUploader { diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java index 5ba18f701ea..1d4668cb8e8 100644 --- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.state.changelog.LocalChangelogRegistry; import org.apache.flink.runtime.state.changelog.LocalChangelogRegistryImpl; import org.apache.flink.runtime.state.changelog.StateChangelogStorage; +