(flink) branch master updated: [FLINK-36243][state/forst] Store namespace in state request and contextKey (#25300)

2024-09-11 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 25969c9de1e [FLINK-36243][state/forst] Store namespace in state 
request and  contextKey (#25300)
25969c9de1e is described below

commit 25969c9de1e73c2364200ae3d155aca4862ea036
Author: Yanfei Lei 
AuthorDate: Wed Sep 11 19:45:57 2024 +0800

[FLINK-36243][state/forst] Store namespace in state request and  contextKey 
(#25300)
---
 .../asyncprocessing/AsyncExecutionController.java  |  6 ++---
 .../runtime/asyncprocessing/StateRequest.java  | 12 +-
 .../asyncprocessing/StateRequestBuffer.java| 10 
 .../asyncprocessing/StateRequestContainer.java |  2 +-
 .../runtime/asyncprocessing/MockStateExecutor.java |  2 +-
 .../asyncprocessing/MockStateRequestContainer.java |  6 ++---
 .../state/v2/InternalKeyedStateTestBase.java   |  8 +++
 .../org/apache/flink/state/forst/ContextKey.java   | 14 ++-
 .../apache/flink/state/forst/ForStInnerTable.java  |  4 ++--
 .../apache/flink/state/forst/ForStListState.java   | 15 
 .../apache/flink/state/forst/ForStMapState.java| 28 +++---
 .../state/forst/ForStStateRequestClassifier.java   |  4 ++--
 .../apache/flink/state/forst/ForStValueState.java  | 14 +++
 .../state/forst/ForStDBOperationTestBase.java  |  4 ++--
 .../flink/state/forst/ForStStateExecutorTest.java  | 20 
 15 files changed, 90 insertions(+), 59 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
index 24ae7e9342f..ad767aa340a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
@@ -239,7 +239,7 @@ public class AsyncExecutionController implements 
StateRequestHandler {
 @Nullable State state, StateRequestType type, @Nullable IN 
payload) {
 // Step 1: build state future & assign context.
 InternalStateFuture stateFuture = 
stateFutureFactory.create(currentContext);
-StateRequest request =
+StateRequest request =
 new StateRequest<>(state, type, payload, stateFuture, 
currentContext);
 
 // Step 2: try to seize the capacity, if the current in-flight records 
exceeds the limit,
@@ -278,11 +278,11 @@ public class AsyncExecutionController implements 
StateRequestHandler {
 currentContext.setNamespace(state, namespace);
 }
 
- void insertActiveBuffer(StateRequest request) {
+ void insertActiveBuffer(StateRequest request) {
 stateRequestsBuffer.enqueueToActive(request);
 }
 
- void insertBlockingBuffer(StateRequest request) {
+ void insertBlockingBuffer(StateRequest request) {
 stateRequestsBuffer.enqueueToBlocking(request);
 }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java
index 87fd02926f2..32527a2e881 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.asyncprocessing;
 
 import org.apache.flink.api.common.state.v2.State;
 import org.apache.flink.core.state.InternalStateFuture;
+import org.apache.flink.runtime.state.v2.InternalPartitionedState;
 
 import javax.annotation.Nullable;
 
@@ -30,9 +31,10 @@ import java.io.Serializable;
  *
  * @param  Type of partitioned key.
  * @param  Type of input of this request.
+ * @param  Type of namespace.
  * @param  Type of value that request will return.
  */
-public class StateRequest implements Serializable {
+public class StateRequest implements Serializable {
 
 /**
  * The underlying state to be accessed, can be empty for {@link 
StateRequestType#SYNC_POINT}.
@@ -51,6 +53,8 @@ public class StateRequest implements Serializable 
{
 /** The record context of this request. */
 private final RecordContext context;
 
+@Nullable private final N namespace;
+
 public StateRequest(
 @Nullable State state,
 StateRequestType type,
@@ -62,6 +66,8 @@ public class StateRequest implements Serializable 
{
 this.payload = payload;
 this.stateFuture = stateFuture;
 this.context = context;
+this.namespace =
+state == null ? null : 
context.getNamespace((InternalPartitionedState) state);
 }
 
 public StateRequestType getRequestType() 

(flink) branch master updated: [FLINK-34510][Runtime/State]Rename RestoreMode to RecoveryClaimMode (#25192)

2024-09-09 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 7adeecd3445 [FLINK-34510][Runtime/State]Rename RestoreMode to 
RecoveryClaimMode (#25192)
7adeecd3445 is described below

commit 7adeecd3445947f42d3e3d1e2961b9464e910236
Author: lz <971066...@qq.com>
AuthorDate: Tue Sep 10 10:32:06 2024 +0800

[FLINK-34510][Runtime/State]Rename RestoreMode to RecoveryClaimMode (#25192)
---
 docs/static/generated/rest_v1_dispatcher.yml   | 16 ++---
 .../apache/flink/client/cli/CliFrontendParser.java | 16 ++---
 .../apache/flink/client/cli/ProgramOptions.java|  6 +-
 .../flink/client/cli/CliFrontendRunTest.java   | 18 ++---
 .../StandaloneApplicationClusterEntryPoint.java| 12 ++--
 .../flink/configuration/StateRecoveryOptions.java  |  8 +--
 .../{RestoreMode.java => RecoveryClaimMode.java}   |  8 +--
 .../KubernetesCheckpointRecoveryFactory.java   |  6 +-
 .../flink/kubernetes/utils/KubernetesUtils.java|  8 +--
 .../runtime/webmonitor/handlers/JarRunHandler.java | 19 +++---
 .../webmonitor/handlers/JarRunRequestBody.java | 25 +++
 .../handlers/JarRunHandlerParameterTest.java   |  6 +-
 .../webmonitor/handlers/JarRunRequestBodyTest.java |  6 +-
 .../runtime/checkpoint/CheckpointCoordinator.java  |  4 +-
 .../runtime/checkpoint/CheckpointProperties.java   | 16 +++--
 .../checkpoint/CheckpointRecoveryFactory.java  |  6 +-
 .../runtime/checkpoint/CompletedCheckpoint.java|  8 +--
 .../EmbeddedCompletedCheckpointStore.java  |  8 +--
 .../PerJobCheckpointRecoveryFactory.java   | 10 +--
 .../StandaloneCheckpointRecoveryFactory.java   |  6 +-
 .../StandaloneCompletedCheckpointStore.java| 14 ++--
 .../ZooKeeperCheckpointRecoveryFactory.java|  6 +-
 .../cleanup/CheckpointResourcesCleanupRunner.java  |  6 +-
 .../EmbeddedHaServicesWithLeadershipControl.java   |  4 +-
 .../runtime/jobgraph/SavepointConfigOptions.java   |  8 +--
 .../runtime/jobgraph/SavepointRestoreSettings.java | 41 +++-
 .../flink/runtime/minicluster/MiniCluster.java |  6 +-
 .../flink/runtime/scheduler/SchedulerUtils.java|  8 +--
 .../flink/runtime/state/SharedStateRegistry.java   | 13 ++--
 .../runtime/state/SharedStateRegistryFactory.java  |  6 +-
 .../runtime/state/SharedStateRegistryImpl.java |  6 +-
 .../apache/flink/runtime/state/StateBackend.java   |  6 +-
 .../apache/flink/runtime/util/ZooKeeperUtils.java  |  8 +--
 .../flink/streaming/runtime/tasks/StreamTask.java  |  4 +-
 .../CheckpointCoordinatorFailureTest.java  |  4 +-
 .../CheckpointCoordinatorRestoringTest.java|  8 ++-
 .../checkpoint/CheckpointCoordinatorTest.java  | 10 +--
 .../CheckpointCoordinatorTriggeringTest.java   |  8 +--
 .../checkpoint/CompletedCheckpointTest.java|  8 ++-
 .../DefaultCompletedCheckpointStoreTest.java   |  4 +-
 .../checkpoint/PerJobCheckpointRecoveryTest.java   | 10 +--
 .../TestingCheckpointRecoveryFactory.java  |  4 +-
 .../ZooKeeperCompletedCheckpointStoreITCase.java   |  4 +-
 .../ZooKeeperCompletedCheckpointStoreTest.java |  4 +-
 .../dispatcher/DispatcherCleanupITCase.java|  8 +--
 .../CheckpointResourcesCleanupRunnerTest.java  |  4 +-
 .../jobgraph/SavepointRestoreSettingsTest.java |  6 +-
 .../runtime/scheduler/SchedulerUtilsTest.java  | 10 +--
 .../runtime/state/SharedStateRegistryTest.java |  4 +-
 .../plan/nodes/exec/testutils/RestoreTestBase.java |  4 +-
 .../ChangelogRecoverySwitchStateBackendITCase.java |  4 +-
 .../ResumeCheckpointManuallyITCase.java| 77 --
 .../test/checkpointing/SavepointFormatITCase.java  |  4 +-
 .../flink/test/checkpointing/SavepointITCase.java  | 11 ++--
 .../SnapshotFileMergingCompatibilityITCase.java| 48 --
 55 files changed, 313 insertions(+), 279 deletions(-)

diff --git a/docs/static/generated/rest_v1_dispatcher.yml 
b/docs/static/generated/rest_v1_dispatcher.yml
index 1ea036fc53b..ccaa7c6bc24 100644
--- a/docs/static/generated/rest_v1_dispatcher.yml
+++ b/docs/static/generated/rest_v1_dispatcher.yml
@@ -2288,7 +2288,7 @@ components:
 allowNonRestoredState:
   type: boolean
 claimMode:
-  $ref: '#/components/schemas/RestoreMode'
+  $ref: '#/components/schemas/RecoveryClaimMode'
 entryClass:
   type: string
 flinkConfiguration:
@@ -2307,7 +2307,7 @@ components:
   items:
 type: string
 restoreMode:
-  $ref: '#/components/schemas/RestoreMode'
+  $ref: '#/components/schemas/RecoveryClaimMode'
 savepointPath:
   type: string
 JarRunResponseBody:
@@ -2773,6 +2773,12 @@ components:
   $ref: '#/components/schemas/Id'
 RawJs

(flink) branch master updated: [hotfix][checkpoint] Rename file-merging options in documents (#25048)

2024-07-09 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new c15145b4a8c [hotfix][checkpoint] Rename file-merging options in 
documents (#25048)
c15145b4a8c is described below

commit c15145b4a8c99fbfd99c6cec0701db0efb1646a2
Author: Yanfei Lei 
AuthorDate: Wed Jul 10 10:00:24 2024 +0800

[hotfix][checkpoint] Rename file-merging options in documents (#25048)
---
 .../docs/dev/datastream/fault-tolerance/checkpointing.md   | 10 +-
 .../docs/dev/datastream/fault-tolerance/checkpointing.md   | 10 +-
 .../org/apache/flink/configuration/CheckpointingOptions.java   |  7 ---
 .../api/environment/ExecutionCheckpointingOptions.java |  7 ---
 4 files changed, 18 insertions(+), 16 deletions(-)

diff --git 
a/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md 
b/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md
index 9d76b5bec06..3d01e0ffddf 100644
--- a/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md
+++ b/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md
@@ -253,18 +253,18 @@ StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironm
 ##  统一的 checkpoint 文件合并机制 (实验性功能)
 
 Flink 1.20 引入了 MVP 版本的统一 checkpoint 文件合并机制,该机制允许把分散的 checkpoint 小文件合并到大文件中,减少 
checkpoint 文件创建删除的次数,
-有助于减轻文件过多问题带来的文件系统元数据管理的压力。可以通过将 `state.checkpoints.file-merging.enabled` 设置为 
`true` 来开启该机制。
-**注意**,考虑 trade-off,开启该机制会导致空间放大,即文件系统上的实际占用会比 state size 更大,可以通过设置 
`state.checkpoints.file-merging.max-space-amplification`
+有助于减轻文件过多问题带来的文件系统元数据管理的压力。可以通过将 
`execution.checkpointing.file-merging.enabled` 设置为 `true` 来开启该机制。
+**注意**,考虑 trade-off,开启该机制会导致空间放大,即文件系统上的实际占用会比 state size 更大,可以通过设置 
`execution.checkpointing.file-merging.max-space-amplification`
 来控制文件放大的上限。
 
 该机制适用于 Flink 中的 keyed state、operator state 和 channel state。对 shared scope 
state 
 提供 subtask 级别的合并;对 private scope state 提供 TaskManager 级别的合并,可以通过
- `state.checkpoints.file-merging.max-subtasks-per-file` 选项配置单个文件允许写入的最大 
subtask 数目。
+ `execution.checkpointing.file-merging.max-subtasks-per-file` 选项配置单个文件允许写入的最大 
subtask 数目。
 
-统一文件合并机制也支持跨 checkpoint 的文件合并,通过设置 
`state.checkpoints.file-merging.across-checkpoint-boundary` 为 `true` 开启。
+统一文件合并机制也支持跨 checkpoint 的文件合并,通过设置 
`execution.checkpointing.file-merging.across-checkpoint-boundary` 为 `true` 开启。
 
 该机制引入了文件池用于处理并发写的场景,文件池有两种模式,Non-blocking 
模式的文件池会对每个文件请求即时返回一个物理文件,在频繁请求的情况下会创建出许多物理文件;
-而 Blocking 模式的文件池会一直阻塞文件请求,直到文件池中有返回的文件可用,可以通过设置 
`state.checkpoints.file-merging.pool-blocking` 为 `true` 
+而 Blocking 模式的文件池会一直阻塞文件请求,直到文件池中有返回的文件可用,可以通过设置 
`execution.checkpointing.file-merging.pool-blocking` 为 `true` 
 选择 Blocking 模式,设置为 `false` 选择 Non-blocking 模式。
 
 {{< top >}}
diff --git a/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md 
b/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md
index 5d410eb6780..6b3d625696d 100644
--- a/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md
+++ b/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md
@@ -297,21 +297,21 @@ to be completed.
 The unified file merging mechanism for checkpointing is introduced to Flink 
1.20 as an MVP ("minimum viable product") feature, 
 which allows scattered small checkpoint files to be written into larger files, 
reducing the number of file creations 
 and file deletions, which alleviates the pressure of file system metadata 
management raised by the file flooding problem during checkpoints.
-The mechanism can be enabled by setting 
`state.checkpoints.file-merging.enabled` to `true`.
+The mechanism can be enabled by setting 
`execution.checkpointing.file-merging.enabled` to `true`.
 **Note** that as a trade-off, enabling this mechanism may lead to space 
amplification, that is, the actual occupation on the file system
-will be larger than actual state size. 
`state.checkpoints.file-merging.max-space-amplification` 
+will be larger than actual state size. 
`execution.checkpointing.file-merging.max-space-amplification` 
 can be used to limit the upper bound of space amplification.
 
 This mechanism is applicable to keyed state, operator state and channel state 
in Flink. Merging at subtask level is 
 provided for shared scope state; Merging at TaskManager level is provided for 
private scope state. The maximum number of subtasks
-allowed to be written to a single file can be configured through the 
`state.checkpoints.file-merging.max-subtasks-per-file` option.
+allowed to be written to a single file can be configured through the 
`execution.checkpointing.file-merging.max-subtasks-per-file` option.
 
 This feature also supports merging files across checkpoints. To enable this, 
set
-`state.checkpoints.file-merging.across-checkpoint-boundary` to `true`.
+`execution.ch

(flink) branch release-1.20 updated: [hotfix][checkpoint] Rename file-merging options in documents (#25049)

2024-07-09 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.20 by this push:
 new 0bdbfeb41be [hotfix][checkpoint] Rename file-merging options in 
documents (#25049)
0bdbfeb41be is described below

commit 0bdbfeb41be53c8c4572937a8ee70f43ce9fd1ad
Author: Yanfei Lei 
AuthorDate: Tue Jul 9 18:31:31 2024 +0800

[hotfix][checkpoint] Rename file-merging options in documents (#25049)
---
 .../docs/dev/datastream/fault-tolerance/checkpointing.md   | 10 +-
 .../docs/dev/datastream/fault-tolerance/checkpointing.md   | 10 +-
 .../org/apache/flink/configuration/CheckpointingOptions.java   |  7 ---
 .../api/environment/ExecutionCheckpointingOptions.java |  7 ---
 4 files changed, 18 insertions(+), 16 deletions(-)

diff --git 
a/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md 
b/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md
index 9d76b5bec06..3d01e0ffddf 100644
--- a/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md
+++ b/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md
@@ -253,18 +253,18 @@ StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironm
 ##  统一的 checkpoint 文件合并机制 (实验性功能)
 
 Flink 1.20 引入了 MVP 版本的统一 checkpoint 文件合并机制,该机制允许把分散的 checkpoint 小文件合并到大文件中,减少 
checkpoint 文件创建删除的次数,
-有助于减轻文件过多问题带来的文件系统元数据管理的压力。可以通过将 `state.checkpoints.file-merging.enabled` 设置为 
`true` 来开启该机制。
-**注意**,考虑 trade-off,开启该机制会导致空间放大,即文件系统上的实际占用会比 state size 更大,可以通过设置 
`state.checkpoints.file-merging.max-space-amplification`
+有助于减轻文件过多问题带来的文件系统元数据管理的压力。可以通过将 
`execution.checkpointing.file-merging.enabled` 设置为 `true` 来开启该机制。
+**注意**,考虑 trade-off,开启该机制会导致空间放大,即文件系统上的实际占用会比 state size 更大,可以通过设置 
`execution.checkpointing.file-merging.max-space-amplification`
 来控制文件放大的上限。
 
 该机制适用于 Flink 中的 keyed state、operator state 和 channel state。对 shared scope 
state 
 提供 subtask 级别的合并;对 private scope state 提供 TaskManager 级别的合并,可以通过
- `state.checkpoints.file-merging.max-subtasks-per-file` 选项配置单个文件允许写入的最大 
subtask 数目。
+ `execution.checkpointing.file-merging.max-subtasks-per-file` 选项配置单个文件允许写入的最大 
subtask 数目。
 
-统一文件合并机制也支持跨 checkpoint 的文件合并,通过设置 
`state.checkpoints.file-merging.across-checkpoint-boundary` 为 `true` 开启。
+统一文件合并机制也支持跨 checkpoint 的文件合并,通过设置 
`execution.checkpointing.file-merging.across-checkpoint-boundary` 为 `true` 开启。
 
 该机制引入了文件池用于处理并发写的场景,文件池有两种模式,Non-blocking 
模式的文件池会对每个文件请求即时返回一个物理文件,在频繁请求的情况下会创建出许多物理文件;
-而 Blocking 模式的文件池会一直阻塞文件请求,直到文件池中有返回的文件可用,可以通过设置 
`state.checkpoints.file-merging.pool-blocking` 为 `true` 
+而 Blocking 模式的文件池会一直阻塞文件请求,直到文件池中有返回的文件可用,可以通过设置 
`execution.checkpointing.file-merging.pool-blocking` 为 `true` 
 选择 Blocking 模式,设置为 `false` 选择 Non-blocking 模式。
 
 {{< top >}}
diff --git a/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md 
b/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md
index 5d410eb6780..6b3d625696d 100644
--- a/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md
+++ b/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md
@@ -297,21 +297,21 @@ to be completed.
 The unified file merging mechanism for checkpointing is introduced to Flink 
1.20 as an MVP ("minimum viable product") feature, 
 which allows scattered small checkpoint files to be written into larger files, 
reducing the number of file creations 
 and file deletions, which alleviates the pressure of file system metadata 
management raised by the file flooding problem during checkpoints.
-The mechanism can be enabled by setting 
`state.checkpoints.file-merging.enabled` to `true`.
+The mechanism can be enabled by setting 
`execution.checkpointing.file-merging.enabled` to `true`.
 **Note** that as a trade-off, enabling this mechanism may lead to space 
amplification, that is, the actual occupation on the file system
-will be larger than actual state size. 
`state.checkpoints.file-merging.max-space-amplification` 
+will be larger than actual state size. 
`execution.checkpointing.file-merging.max-space-amplification` 
 can be used to limit the upper bound of space amplification.
 
 This mechanism is applicable to keyed state, operator state and channel state 
in Flink. Merging at subtask level is 
 provided for shared scope state; Merging at TaskManager level is provided for 
private scope state. The maximum number of subtasks
-allowed to be written to a single file can be configured through the 
`state.checkpoints.file-merging.max-subtasks-per-file` option.
+allowed to be written to a single file can be configured through the 
`execution.checkpointing.file-merging.max-subtasks-per-file` option.
 
 This feature also supports merging files across checkpoints. To enable this, 
set
-`state.checkpoints.file-merging.across-checkpoint-bo

(flink) branch release-1.18 updated: [BP-1.18] [FLINK-33192][runtime] Fix Memory Leak in WindowOperator due to Improper Timer Cleanup (#25032)

2024-07-07 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.18 by this push:
 new 87b36233512 [BP-1.18] [FLINK-33192][runtime] Fix Memory Leak in 
WindowOperator due to Improper Timer Cleanup (#25032)
87b36233512 is described below

commit 87b362335128e62112cf34839837b2d635d93df2
Author: Kartikey Pant 
AuthorDate: Mon Jul 8 07:41:47 2024 +0530

[BP-1.18] [FLINK-33192][runtime] Fix Memory Leak in WindowOperator due to 
Improper Timer Cleanup (#25032)

* [FLINK-33192][runtime] Fix Memory Leak in WindowOperator due to Improper 
Timer Cleanup
* Fix one of the added WindowOperatorTest methods to be public
---
 .../operators/windowing/WindowOperator.java|  10 +-
 .../operators/windowing/WindowOperatorTest.java| 151 +
 2 files changed, 155 insertions(+), 6 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index a45b5b68e72..a660a4c2ea6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -372,10 +372,9 @@ public class WindowOperator
 
 if (triggerResult.isFire()) {
 ACC contents = windowState.get();
-if (contents == null) {
-continue;
+if (contents != null) {
+emitWindowContents(actualWindow, contents);
 }
-emitWindowContents(actualWindow, contents);
 }
 
 if (triggerResult.isPurge()) {
@@ -405,10 +404,9 @@ public class WindowOperator
 
 if (triggerResult.isFire()) {
 ACC contents = windowState.get();
-if (contents == null) {
-continue;
+if (contents != null) {
+emitWindowContents(window, contents);
 }
-emitWindowContents(window, contents);
 }
 
 if (triggerResult.isPurge()) {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index bc5ef7f0b05..dda2728ab44 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -84,6 +84,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -3069,6 +3070,114 @@ public class WindowOperatorTest extends TestLogger {
 testHarness.close();
 }
 
+@Test
+public void testCleanupTimerWithEmptyStateNoResultForTumblingWindows() 
throws Exception {
+final int windowSize = 2;
+final long lateness = 1;
+
+ListStateDescriptor> windowStateDesc =
+new ListStateDescriptor<>(
+"window-contents",
+STRING_INT_TUPLE.createSerializer(new 
ExecutionConfig()));
+
+WindowOperator<
+String,
+Tuple2,
+Iterable>,
+Tuple2,
+TimeWindow>
+operator =
+new WindowOperator<>(
+
TumblingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
+new TimeWindow.Serializer(),
+new TupleKeySelector(),
+
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(
+new ExecutionConfig()),
+windowStateDesc,
+new InternalIterableWindowFunction<>(new 
EmptyReturnFunction()),
+new 
FireEverytimeOnElementAndEventTimeTrigger(),
+lateness,
+null /* late data output tag */);
+
+OneInputStreamOperatorTestHarness, 
Tuple2>
+ 

(flink) branch master updated: [FLINK-33192][runtime] Fix Memory Leak in WindowOperator due to Improper Timer Cleanup (#24917)

2024-07-05 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new e78524a8780 [FLINK-33192][runtime] Fix Memory Leak in WindowOperator 
due to Improper Timer Cleanup (#24917)
e78524a8780 is described below

commit e78524a87808913bfcd6a84847af46501a9a7266
Author: Kartikey Pant 
AuthorDate: Sat Jul 6 10:04:43 2024 +0530

[FLINK-33192][runtime] Fix Memory Leak in WindowOperator due to Improper 
Timer Cleanup (#24917)
---
 .../operators/windowing/WindowOperator.java|  10 +-
 .../operators/windowing/WindowOperatorTest.java| 150 +
 2 files changed, 154 insertions(+), 6 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 5c3929f79dc..e67800f712d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -376,10 +376,9 @@ public class WindowOperator
 
 if (triggerResult.isFire()) {
 ACC contents = windowState.get();
-if (contents == null) {
-continue;
+if (contents != null) {
+emitWindowContents(actualWindow, contents);
 }
-emitWindowContents(actualWindow, contents);
 }
 
 if (triggerResult.isPurge()) {
@@ -409,10 +408,9 @@ public class WindowOperator
 
 if (triggerResult.isFire()) {
 ACC contents = windowState.get();
-if (contents == null) {
-continue;
+if (contents != null) {
+emitWindowContents(window, contents);
 }
-emitWindowContents(window, contents);
 }
 
 if (triggerResult.isPurge()) {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 9950dd89b95..48a09ec4fa2 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -3130,6 +3130,114 @@ class WindowOperatorTest {
 testHarness.close();
 }
 
+@Test
+void testCleanupTimerWithEmptyStateNoResultForTumblingWindows() throws 
Exception {
+final int windowSize = 2;
+final long lateness = 1;
+
+ListStateDescriptor> windowStateDesc =
+new ListStateDescriptor<>(
+"window-contents",
+STRING_INT_TUPLE.createSerializer(new 
SerializerConfigImpl()));
+
+WindowOperator<
+String,
+Tuple2,
+Iterable>,
+Tuple2,
+TimeWindow>
+operator =
+new WindowOperator<>(
+
TumblingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
+new TimeWindow.Serializer(),
+new TupleKeySelector(),
+
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(
+new SerializerConfigImpl()),
+windowStateDesc,
+new InternalIterableWindowFunction<>(new 
EmptyReturnFunction()),
+new 
FireEverytimeOnElementAndEventTimeTrigger(),
+lateness,
+null /* late data output tag */);
+
+OneInputStreamOperatorTestHarness, 
Tuple2>
+testHarness = createTestHarness(operator);
+
+testHarness.open();
+
+ConcurrentLinkedQueue expected = new ConcurrentLinkedQueue<>();
+// normal element
+testHarness.processElement(new StreamRecord<>(new Tuple2<>("test_key", 
1), 1000));
+assertThat(
+operator.processContext
+.windowState()
+.getListState(windowStateDesc)
+.get()
+   

(flink) branch master updated: [FLINK-32091][checkpoint] Add file size metrics for file-merging (#24922)

2024-06-14 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 54978689f68 [FLINK-32091][checkpoint] Add file size metrics for 
file-merging (#24922)
54978689f68 is described below

commit 54978689f68ffa03e7b89292f6a7aa94a2406116
Author: Yanfei Lei 
AuthorDate: Fri Jun 14 18:38:42 2024 +0800

[FLINK-32091][checkpoint] Add file size metrics for file-merging (#24922)
---
 docs/content.zh/docs/ops/metrics.md|  21 +
 docs/content/docs/ops/metrics.md   |  21 +
 ...AcrossCheckpointFileMergingSnapshotManager.java |   6 +-
 .../filemerging/FileMergingMetricGroup.java|  54 +++
 .../FileMergingSnapshotManagerBase.java|   8 +-
 .../FileMergingSnapshotManagerBuilder.java |  21 -
 ...WithinCheckpointFileMergingSnapshotManager.java |   6 +-
 .../state/TaskExecutorFileMergingManager.java  |   5 +-
 .../flink/runtime/taskexecutor/TaskExecutor.java   |   3 +-
 .../filemerging/FileMergingMetricsTest.java| 103 +
 .../FileMergingSnapshotManagerTestBase.java|   4 +
 .../state/TaskExecutorFileMergingManagerTest.java  |  25 -
 12 files changed, 264 insertions(+), 13 deletions(-)

diff --git a/docs/content.zh/docs/ops/metrics.md 
b/docs/content.zh/docs/ops/metrics.md
index f0778bdc292..e461044dddf 100644
--- a/docs/content.zh/docs/ops/metrics.md
+++ b/docs/content.zh/docs/ops/metrics.md
@@ -1358,6 +1358,27 @@ Note that for failed checkpoints, metrics are updated on 
a best efforts basis an
   The time in nanoseconds that elapsed between the creation of the 
last checkpoint and the time when the checkpointing process has started by this 
Task. This delay shows how long it takes for the first checkpoint barrier to 
reach the task. A high value indicates back-pressure. If only a specific task 
has a long start delay, the most likely reason is data skew.
   Gauge
 
+
+  Job (only available on TaskManager)
+  fileMerging.logicalFileCount
+  The number of logical files of file merging mechanism.
+  Gauge
+
+
+  fileMerging.logicalFileSize
+  The total size of logical files of file merging mechanism on one 
task manager for one job.
+  Gauge
+
+
+  fileMerging.physicalFileCount
+  The number of physical files of file merging mechanism.
+  Gauge
+
+
+  fileMerging.physicalFileSize
+  The total size of physical files of file merging mechanism on one 
task manager for one job, usually larger than 
fileMerging.logicalFileSize.
+  Gauge
+
   
 
 
diff --git a/docs/content/docs/ops/metrics.md b/docs/content/docs/ops/metrics.md
index f65d3cf74e4..5ab5cb21180 100644
--- a/docs/content/docs/ops/metrics.md
+++ b/docs/content/docs/ops/metrics.md
@@ -1348,6 +1348,27 @@ Note that for failed checkpoints, metrics are updated on 
a best efforts basis an
   The time in nanoseconds that elapsed between the creation of the 
last checkpoint and the time when the checkpointing process has started by this 
Task. This delay shows how long it takes for the first checkpoint barrier to 
reach the task. A high value indicates back-pressure. If only a specific task 
has a long start delay, the most likely reason is data skew.
   Gauge
 
+
+  Job (only available on TaskManager)
+  fileMerging.logicalFileCount
+  The number of logical files of file merging mechanism.
+  Gauge
+
+
+  fileMerging.logicalFileSize
+  The total size of logical files of file merging mechanism on one 
task manager for one job.
+  Gauge
+
+
+  fileMerging.physicalFileCount
+  The number of physical files of file merging mechanism.
+  Gauge
+
+
+  fileMerging.physicalFileSize
+  The total size of physical files of file merging mechanism on one 
task manager for one job, usually larger than 
fileMerging.logicalFileSize.
+  Gauge
+
   
 
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManager.java
index be0aeed9a58..6bebd1586f3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManager.java
@@ -18,6 +18,7 @@
 package org.apache.flink.runtime.checkpoint.filemerging;
 
 import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
 
 import javax.annotation.Nonnull;
@@ -35,8 +36,9 @@ public

(flink) branch master updated (9fbe9f30480 -> 1e996b8731b)

2024-05-27 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 9fbe9f30480 [FLINK-35460][state] Adjust read size when ByteBuffer size 
is larger than file size for ForSt (#24847)
 add 1e996b8731b [FLINK-35457][checkpoint] Hotfix! close physical file 
under the protection of lock (#24846)

No new revisions were added by this update.

Summary of changes:
 .../FileMergingSnapshotManagerBase.java| 61 --
 .../checkpoint/filemerging/PhysicalFile.java   | 13 +++--
 2 files changed, 41 insertions(+), 33 deletions(-)



(flink) branch master updated: [FLINK-35382][test] Disable snapshot-file-merging in ChangelogCompatibilityITCase (#24813)

2024-05-20 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 26b149a1664 [FLINK-35382][test] Disable snapshot-file-merging in 
ChangelogCompatibilityITCase (#24813)
26b149a1664 is described below

commit 26b149a1664ca8ea395badbb02e1e245a623ee90
Author: Jinzhong Li 
AuthorDate: Tue May 21 10:30:53 2024 +0800

[FLINK-35382][test] Disable snapshot-file-merging in 
ChangelogCompatibilityITCase (#24813)
---
 .../apache/flink/test/state/ChangelogCompatibilityITCase.java | 11 +++
 1 file changed, 7 insertions(+), 4 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java
index 3844194821a..675d26bb3c6 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java
@@ -137,9 +137,9 @@ public class ChangelogCompatibilityITCase {
 
 private StreamExecutionEnvironment initEnvironment() {
 Configuration conf = new Configuration();
-conf.set(
-FILE_MERGING_ENABLED, false); // TODO: remove file merging 
setting after FLINK-32085
-StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+// TODO:remove file-merging setting after FLINK-32085 & FLINK-32081 
are resolved.
+conf.set(FILE_MERGING_ENABLED, false);
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(conf);
 env.enableChangelogStateBackend(testCase.startWithChangelog);
 if (testCase.restoreSource == RestoreSource.CHECKPOINT) {
 env.enableCheckpointing(50);
@@ -182,7 +182,10 @@ public class ChangelogCompatibilityITCase {
 }
 
 private void restoreAndValidate(String location) {
-StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+Configuration conf = new Configuration();
+// TODO:remove file-merging setting after FLINK-32085 & FLINK-32081 
are resolved.
+conf.set(FILE_MERGING_ENABLED, false);
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(conf);
 env.enableChangelogStateBackend(testCase.restoreWithChangelog);
 JobGraph jobGraph = addGraph(env);
 jobGraph.setSavepointRestoreSettings(forPath(location));



(flink) branch master updated: [FLINK-35030][runtime] Introduce Epoch Manager for under async execution (#24748)

2024-05-16 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new f1ecb9e4701 [FLINK-35030][runtime] Introduce Epoch Manager for under 
async execution (#24748)
f1ecb9e4701 is described below

commit f1ecb9e4701d612050da54589a8f561857debf34
Author: Yanfei Lei 
AuthorDate: Fri May 17 13:24:38 2024 +0800

[FLINK-35030][runtime] Introduce Epoch Manager for under async execution 
(#24748)
---
 .../asyncprocessing/AsyncExecutionController.java  |  41 +++-
 .../runtime/asyncprocessing/EpochManager.java  | 210 +
 .../runtime/asyncprocessing/RecordContext.java |  23 ++-
 .../AsyncExecutionControllerTest.java  |  92 +
 .../ContextStateFutureImplTest.java|   3 +-
 .../runtime/asyncprocessing/EpochManagerTest.java  |  61 ++
 .../state/forst/ForStDBOperationTestBase.java  |   4 +-
 .../flink/state/forst/ForStStateExecutorTest.java  |   4 +-
 .../api/operators/AbstractStreamOperatorV2.java|   4 +-
 .../AbstractAsyncStateStreamOperator.java  |  23 +++
 .../AbstractAsyncStateStreamOperatorV2.java|  31 +++
 11 files changed, 484 insertions(+), 12 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
index f33794e16c9..693ad8753f1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.api.common.state.v2.State;
 import org.apache.flink.core.state.InternalStateFuture;
 import 
org.apache.flink.core.state.StateFutureImpl.AsyncFrameworkExceptionHandler;
+import org.apache.flink.runtime.asyncprocessing.EpochManager.ParallelMode;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.ThrowingRunnable;
@@ -76,6 +77,9 @@ public class AsyncExecutionController implements 
StateRequestHandler {
  */
 private final MailboxExecutor mailboxExecutor;
 
+/** Exception handler to handle the exception thrown by asynchronous 
framework. */
+private final AsyncFrameworkExceptionHandler exceptionHandler;
+
 /** The key accounting unit which is used to detect the key conflict. */
 final KeyAccountingUnit keyAccountingUnit;
 
@@ -86,7 +90,7 @@ public class AsyncExecutionController implements 
StateRequestHandler {
 private final StateFutureFactory stateFutureFactory;
 
 /** The state executor where the {@link StateRequest} is actually 
executed. */
-StateExecutor stateExecutor;
+final StateExecutor stateExecutor;
 
 /** The corresponding context that currently runs in task thread. */
 RecordContext currentContext;
@@ -102,6 +106,15 @@ public class AsyncExecutionController implements 
StateRequestHandler {
 /** Max parallelism of the job. */
 private final int maxParallelism;
 
+/** The reference of epoch manager. */
+final EpochManager epochManager;
+
+/**
+ * The parallel mode of epoch execution. Keep this field internal for now, 
until we could see
+ * the concrete need for {@link ParallelMode#PARALLEL_BETWEEN_EPOCH} from 
average users.
+ */
+final ParallelMode epochParallelMode = ParallelMode.SERIAL_BETWEEN_EPOCH;
+
 public AsyncExecutionController(
 MailboxExecutor mailboxExecutor,
 AsyncFrameworkExceptionHandler exceptionHandler,
@@ -112,6 +125,7 @@ public class AsyncExecutionController implements 
StateRequestHandler {
 int maxInFlightRecords) {
 this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
 this.mailboxExecutor = mailboxExecutor;
+this.exceptionHandler = exceptionHandler;
 this.stateFutureFactory = new StateFutureFactory<>(this, 
mailboxExecutor, exceptionHandler);
 this.stateExecutor = stateExecutor;
 this.batchSize = batchSize;
@@ -131,11 +145,13 @@ public class AsyncExecutionController implements 
StateRequestHandler {
 },
 "AEC-buffer-timeout"));
 
+this.epochManager = new EpochManager(this);
 LOG.info(
-"Create AsyncExecutionController: batchSize {}, bufferTimeout 
{}, maxInFlightRecordNum {}",
+"Create AsyncExecutionController: batchSize {}, bufferTimeout 
{}, maxInFlightRecordNum {}, epochParallelMode {}",
 this.batc

(flink) branch master updated: [FLINK-32082][docs] Documentation of checkpoint file-merging (#24766)

2024-05-16 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new a8cf2ba1224 [FLINK-32082][docs] Documentation of checkpoint 
file-merging (#24766)
a8cf2ba1224 is described below

commit a8cf2ba1224b0729eedc8712cfe60c30c590fc11
Author: Yanfei Lei 
AuthorDate: Thu May 16 15:42:38 2024 +0800

[FLINK-32082][docs] Documentation of checkpoint file-merging (#24766)
---
 .../datastream/fault-tolerance/checkpointing.md| 17 +
 .../datastream/fault-tolerance/checkpointing.md| 22 ++
 2 files changed, 39 insertions(+)

diff --git 
a/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md 
b/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md
index b8148aaae10..9acbdd4c324 100644
--- a/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md
+++ b/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md
@@ -250,5 +250,22 @@ StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironm
 需要注意的是,这一行为可能会延长任务运行的时间,如果 checkpoint 周期比较大,这一延迟会非常明显。
 极端情况下,如果 checkpoint 的周期被设置为 `Long.MAX_VALUE`,那么任务永远不会结束,因为下一次 checkpoint 不会进行。
 
+##  统一的 checkpoint 文件合并机制 (实验性功能)
+
+Flink 1.20 引入了 MVP 版本的统一 checkpoint 文件合并机制,该机制允许把分散的 checkpoint 小文件合并到大文件中,减少 
checkpoint 文件创建删除的次数,
+有助于减轻文件过多问题带来的文件系统元数据管理的压力。可以通过将 `state.checkpoints.file-merging.enabled` 设置为 
`true` 来开启该机制。
+**注意**,考虑 trade-off,开启该机制会导致空间放大,即文件系统上的实际占用会比 state size 更大,可以通过设置 
`state.checkpoints.file-merging.max-space-amplification`
+来控制文件放大的上限。
+
+该机制适用于 Flink 中的 keyed state、operator state 和 channel state。对 shared scope 
state 
+提供 subtask 级别的合并;对 private scope state 提供 TaskManager 级别的合并,可以通过
+ `state.checkpoints.file-merging.max-subtasks-per-file` 选项配置单个文件允许写入的最大 
subtask 数目。
+
+统一文件合并机制也支持跨 checkpoint 的文件合并,通过设置 
`state.checkpoints.file-merging.across-checkpoint-boundary` 为 `true` 开启。
+
+该机制引入了文件池用于处理并发写的场景,文件池有两种模式,Non-blocking 
模式的文件池会对每个文件请求即时返回一个物理文件,在频繁请求的情况下会创建出许多物理文件;
+而 Blocking 模式的文件池会一直阻塞文件请求,直到文件池中有返回的文件可用,可以通过设置 
`state.checkpoints.file-merging.pool-blocking` 为 `true` 
+选择 Blocking 模式,设置为 `false` 选择 Non-blocking 模式。
+
 {{< top >}}
 
diff --git a/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md 
b/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md
index 5dc9e8499f6..b8eb5888fa8 100644
--- a/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md
+++ b/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md
@@ -292,4 +292,26 @@ The final checkpoint would be triggered immediately after 
all operators have rea
 without waiting for periodic triggering, but the job will need to wait for 
this final checkpoint 
 to be completed.
 
+## Unify file merging mechanism for checkpoints (Experimental)
+
+The unified file merging mechanism for checkpointing is introduced to Flink 
1.20 as an MVP ("minimum viable product") feature, 
+which allows scattered small checkpoint files to be written into larger files, 
reducing the number of file creations 
+and file deletions, which alleviates the pressure of file system metadata 
management raised by the file flooding problem during checkpoints.
+The mechanism can be enabled by setting 
`state.checkpoints.file-merging.enabled` to `true`.
+**Note** that as a trade-off, enabling this mechanism may lead to space 
amplification, that is, the actual occupation on the file system
+will be larger than actual state size. 
`state.checkpoints.file-merging.max-space-amplification` 
+can be used to limit the upper bound of space amplification.
+
+This mechanism is applicable to keyed state, operator state and channel state 
in Flink. Merging at subtask level is 
+provided for shared scope state; Merging at TaskManager level is provided for 
private scope state. The maximum number of subtasks
+allowed to be written to a single file can be configured through the 
`state.checkpoints.file-merging.max-subtasks-per-file` option.
+
+This feature also supports merging files across checkpoints. To enable this, 
set
+`state.checkpoints.file-merging.across-checkpoint-boundary` to `true`.
+
+This mechanism introduces a file pool to handle concurrent writing scenarios. 
There are two modes, the non-blocking mode will 
+always provide usable physical file without blocking when receive a file 
request, it may create many physical files if poll 
+file frequently; while the blocking mode will be blocked until there are 
returned files available in the file pool. This can be configured via
+setting `state.checkpoints.file-merging.pool-blocking` as `true` for blocking 
or `false` for non-blocking.
+
 {{< top >}}



(flink) branch master updated: [FLINK-32092][tests] Integrate snapshot file-merging with existing IT cases (#24789)

2024-05-15 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new b87ead743dc [FLINK-32092][tests] Integrate snapshot file-merging with 
existing IT cases (#24789)
b87ead743dc is described below

commit b87ead743dca161cdae8a1fef761954d206b81fb
Author: Yanfei Lei 
AuthorDate: Thu May 16 10:12:12 2024 +0800

[FLINK-32092][tests] Integrate snapshot file-merging with existing IT cases 
(#24789)
---
 .../flink/runtime/state/OperatorStateRestoreOperation.java |  4 +++-
 .../runtime/state/filemerging/SegmentFileStateHandle.java  |  3 ++-
 .../org/apache/flink/streaming/runtime/tasks/StreamTask.java   | 10 ++
 .../org/apache/flink/streaming/util/TestStreamEnvironment.java |  7 ++-
 .../flink/test/checkpointing/ChangelogRecoveryITCaseBase.java  |  6 +-
 .../flink/test/checkpointing/RestoreUpgradedJobITCase.java |  7 ++-
 .../apache/flink/test/state/ChangelogCompatibilityITCase.java  |  4 
 .../flink/test/state/ChangelogRecoveryCachingITCase.java   |  3 +++
 .../org/apache/flink/test/state/ChangelogRescalingITCase.java  |  3 +++
 9 files changed, 38 insertions(+), 9 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java
index 33ec3d92ab8..e7ba144d5b7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java
@@ -24,6 +24,7 @@ import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import 
org.apache.flink.runtime.state.filemerging.EmptyFileMergingOperatorStreamStateHandle;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import org.apache.flink.util.Preconditions;
 
@@ -66,7 +67,8 @@ public class OperatorStateRestoreOperation implements 
RestoreOperation {
 
 for (OperatorStateHandle stateHandle : stateHandles) {
 
-if (stateHandle == null) {
+if (stateHandle == null
+|| stateHandle instanceof 
EmptyFileMergingOperatorStreamStateHandle) {
 continue;
 }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/SegmentFileStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/SegmentFileStateHandle.java
index 11d68e7b394..a4b31cc310e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/SegmentFileStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/SegmentFileStateHandle.java
@@ -106,7 +106,8 @@ public class SegmentFileStateHandle implements 
StreamStateHandle {
 
 @Override
 public PhysicalStateHandleID getStreamStateHandleID() {
-return new PhysicalStateHandleID(filePath.toUri().toString());
+return new PhysicalStateHandleID(
+String.format("%s-%d-%d", filePath.toUri(), startPos, 
stateSize));
 }
 
 public long getStartPos() {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index e0ffe9b9267..433edf63f25 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -562,10 +562,12 @@ public abstract class StreamTask>
 return checkpointStorageAccess;
 }
 try {
-CheckpointStorageWorkerView mergingCheckpointStorageAccess =
-checkpointStorageAccess.toFileMergingStorage(
-fileMergingSnapshotManager, environment);
-return (CheckpointStorageAccess) mergingCheckpointStorageAccess;
+CheckpointStorageAccess mergingCheckpointStorageAccess =
+(CheckpointStorageAccess)
+checkpointStorageAccess.toFileMergingStorage(
+fileMergingSnapshotManager, environment);
+
mergingCheckpointStorageAccess.initializeBaseLocationsForCheckpoint();
+return mergingCheckpointStorageAccess;
 } catch (IOException e) {
 LOG.warn(
 "Initiating FsMergingCheckpointStorageAccess failed "
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnv

(flink) branch master updated: [FLINK-32087][checkpoint] Introduce space amplification statistics of file merging (#24762)

2024-05-14 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 9a5a99b1a30 [FLINK-32087][checkpoint] Introduce space amplification 
statistics of file merging (#24762)
9a5a99b1a30 is described below

commit 9a5a99b1a30054268bbde36d565cbb1b81018890
Author: Yanfei Lei 
AuthorDate: Tue May 14 16:21:03 2024 +0800

[FLINK-32087][checkpoint] Introduce space amplification statistics of file 
merging (#24762)
---
 .../filemerging/FileMergingSnapshotManager.java| 75 ++
 .../FileMergingSnapshotManagerBase.java| 49 ++
 .../checkpoint/filemerging/LogicalFile.java|  1 +
 .../checkpoint/filemerging/PhysicalFile.java   | 32 +++--
 ...ssCheckpointFileMergingSnapshotManagerTest.java | 30 -
 .../FileMergingSnapshotManagerTestBase.java| 47 ++
 ...inCheckpointFileMergingSnapshotManagerTest.java | 30 -
 ...FileMergingCheckpointStateOutputStreamTest.java |  5 +-
 8 files changed, 247 insertions(+), 22 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
index add8806369c..f3523c4430f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
@@ -34,6 +34,7 @@ import 
org.apache.flink.runtime.state.filesystem.FsCheckpointStorageAccess;
 
 import java.io.Closeable;
 import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Stream;
 
 /**
@@ -268,4 +269,78 @@ public interface FileMergingSnapshotManager extends 
Closeable {
 "%s-%s(%d/%d)", jobIDString, operatorIDString, 
subtaskIndex, parallelism);
 }
 }
+
+/** Space usage statistics of a managed directory. */
+final class SpaceStat {
+
+AtomicLong physicalFileCount;
+AtomicLong physicalFileSize;
+
+AtomicLong logicalFileCount;
+AtomicLong logicalFileSize;
+
+public SpaceStat() {
+this(0, 0, 0, 0);
+}
+
+public SpaceStat(
+long physicalFileCount,
+long physicalFileSize,
+long logicalFileCount,
+long logicalFileSize) {
+this.physicalFileCount = new AtomicLong(physicalFileCount);
+this.physicalFileSize = new AtomicLong(physicalFileSize);
+this.logicalFileCount = new AtomicLong(logicalFileCount);
+this.logicalFileSize = new AtomicLong(logicalFileSize);
+}
+
+public void onLogicalFileCreate(long size) {
+physicalFileSize.addAndGet(size);
+logicalFileSize.addAndGet(size);
+logicalFileCount.incrementAndGet();
+}
+
+public void onLogicalFileDelete(long size) {
+logicalFileSize.addAndGet(-size);
+logicalFileCount.decrementAndGet();
+}
+
+public void onPhysicalFileCreate() {
+physicalFileCount.incrementAndGet();
+}
+
+public void onPhysicalFileDelete(long size) {
+physicalFileSize.addAndGet(-size);
+physicalFileCount.decrementAndGet();
+}
+
+@Override
+public boolean equals(Object o) {
+if (this == o) {
+return true;
+}
+if (o == null || getClass() != o.getClass()) {
+return false;
+}
+SpaceStat spaceStat = (SpaceStat) o;
+return physicalFileCount.get() == spaceStat.physicalFileCount.get()
+&& physicalFileSize.get() == 
spaceStat.physicalFileSize.get()
+&& logicalFileCount.get() == 
spaceStat.logicalFileCount.get()
+&& logicalFileSize.get() == 
spaceStat.logicalFileSize.get();
+}
+
+@Override
+public String toString() {
+return "SpaceStat{"
++ "physicalFileCount="
++ physicalFileCount.get()
++ ", physicalFileSize="
++ physicalFileSize.get()
++ ", logicalFileCount="
++ logicalFileCount.get()
++ ", logicalFileSize="
++ logicalFileSize.get()
++ '}';
+}
+}
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java
 
b/flink-runtime/src/main/java/org/apa

(flink) branch master updated (547e4b53ebe -> 4fe66e06974)

2024-05-09 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 547e4b53ebe [FLINK-35270]Enrich information in logs, making it easier 
for debugging
 add 4fe66e06974 [FLINK-32084][checkpoint] Migrate current file merging of 
channel state snapshot into the unify file merging framework

No new revisions were added by this update.

Summary of changes:
 .../ChannelStateWriteRequestDispatcherImpl.java| 20 +-
 .../ChannelStateWriteRequestExecutorFactory.java   | 24 +---
 .../checkpoint/channel/ChannelStateWriterImpl.java | 13 ---
 .../filesystem/FsCheckpointStorageAccess.java  |  1 -
 .../FsMergingCheckpointStorageAccess.java  |  6 ++-
 ...ChannelStateWriteRequestDispatcherImplTest.java | 12 ++
 .../ChannelStateWriteRequestDispatcherTest.java|  3 +-
 ...hannelStateWriteRequestExecutorFactoryTest.java | 11 --
 .../ChannelStateWriteRequestExecutorImplTest.java  |  3 +-
 .../channel/ChannelStateWriterImplTest.java| 12 --
 .../operators/testutils/DummyEnvironment.java  | 14 +++
 .../runtime/state/ChannelPersistenceITCase.java|  4 +-
 .../environment/ExecutionCheckpointingOptions.java |  9 +
 .../flink/streaming/runtime/tasks/StreamTask.java  | 43 ++
 .../tasks/SubtaskCheckpointCoordinatorImpl.java| 43 +++---
 .../MockSubtaskCheckpointCoordinatorBuilder.java   | 27 --
 .../runtime/tasks/StreamMockEnvironment.java   | 11 ++
 .../tasks/StreamTaskMailboxTestHarnessBuilder.java |  4 ++
 .../tasks/SubtaskCheckpointCoordinatorTest.java| 10 -
 19 files changed, 166 insertions(+), 104 deletions(-)



(flink) branch master updated: [FLINK-35158][runtime] Error handling in StateFuture's callback

2024-05-08 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new bb0f4429598 [FLINK-35158][runtime] Error handling in StateFuture's 
callback
bb0f4429598 is described below

commit bb0f4429598db703e69d951986291862cbd5416b
Author: fredia 
AuthorDate: Mon Apr 22 22:24:29 2024 +0800

[FLINK-35158][runtime] Error handling in StateFuture's callback
---
 .../flink/api/common/state/v2/StateFuture.java |  18 +-
 .../flink/util/function/FunctionWithException.java |  22 ++
 .../flink/util/function/ThrowingConsumer.java  |  19 ++
 .../flink/core/state/CompletedStateFuture.java |  39 ++-
 .../flink/core/state/InternalStateFuture.java  |   5 +-
 .../apache/flink/core/state/StateFutureImpl.java   | 262 +
 .../apache/flink/core/state/StateFutureTest.java   |  43 +++-
 .../asyncprocessing/AsyncExecutionController.java  |  16 +-
 .../asyncprocessing/AsyncStateException.java   |  34 ++-
 .../asyncprocessing/ContextStateFutureImpl.java|  19 +-
 .../asyncprocessing/StateFutureFactory.java|  10 +-
 .../AsyncExecutionControllerTest.java  | 161 -
 .../ContextStateFutureImplTest.java| 129 --
 .../state/forst/ForStDBOperationTestBase.java  |  39 +--
 .../AbstractAsyncStateStreamOperator.java  |  10 +-
 .../AbstractAsyncStateStreamOperatorV2.java|  15 +-
 .../InternalTimerServiceAsyncImplTest.java |  17 +-
 .../AbstractAsyncStateStreamOperatorTest.java  |   8 +-
 .../AbstractAsyncStateStreamOperatorV2Test.java|  11 +-
 19 files changed, 640 insertions(+), 237 deletions(-)

diff --git 
a/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/StateFuture.java
 
b/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/StateFuture.java
index 1f7b2f0d569..3c7faf6b928 100644
--- 
a/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/StateFuture.java
+++ 
b/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/StateFuture.java
@@ -19,10 +19,9 @@
 package org.apache.flink.api.common.state.v2;
 
 import org.apache.flink.annotation.Experimental;
-
-import java.util.function.BiFunction;
-import java.util.function.Consumer;
-import java.util.function.Function;
+import org.apache.flink.util.function.BiFunctionWithException;
+import org.apache.flink.util.function.FunctionWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
 
 /**
  * StateFuture is a future that act as a return value for async state 
interfaces. Note: All these
@@ -40,7 +39,8 @@ public interface StateFuture {
  * @param  the function's return type.
  * @return the new StateFuture.
  */
- StateFuture thenApply(Function fn);
+ StateFuture thenApply(
+FunctionWithException 
fn);
 
 /**
  * Returns a new StateFuture that, when this future completes normally, is 
executed with this
@@ -49,7 +49,7 @@ public interface StateFuture {
  * @param action the action to perform before completing the returned 
StateFuture.
  * @return the new StateFuture.
  */
-StateFuture thenAccept(Consumer action);
+StateFuture thenAccept(ThrowingConsumer action);
 
 /**
  * Returns a new future that, when this future completes normally, is 
executed with this future
@@ -58,7 +58,8 @@ public interface StateFuture {
  * @param action the action to perform.
  * @return the new StateFuture.
  */
- StateFuture thenCompose(Function> action);
+ StateFuture thenCompose(
+FunctionWithException, ? 
extends Exception> action);
 
 /**
  * Returns a new StateFuture that, when this and the other given future 
both complete normally,
@@ -71,5 +72,6 @@ public interface StateFuture {
  * @return the new StateFuture.
  */
  StateFuture thenCombine(
-StateFuture other, BiFunction fn);
+StateFuture other,
+BiFunctionWithException fn);
 }
diff --git 
a/flink-core-api/src/main/java/org/apache/flink/util/function/FunctionWithException.java
 
b/flink-core-api/src/main/java/org/apache/flink/util/function/FunctionWithException.java
index 76718656f06..86466c05d39 100644
--- 
a/flink-core-api/src/main/java/org/apache/flink/util/function/FunctionWithException.java
+++ 
b/flink-core-api/src/main/java/org/apache/flink/util/function/FunctionWithException.java
@@ -20,6 +20,8 @@ package org.apache.flink.util.function;
 
 import org.apache.flink.annotation.Public;
 
+import java.util.function.Function;
+
 /**
  * A functional interface for a {@link java.util.function.Function} that may 
throw exceptions.
  *
@@ -39,4 +41,24 @@ public interface FunctionWithException {
  * @throws E This function may throw an exception.
  */
 R apply(T value) throws E;
+
+/**
+   

(flink) branch master updated (714d1cb2e0b -> 3ff2ba43720)

2024-04-26 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 714d1cb2e0b [FLINK-35189][test-utils] Introduce test-filesystem 
connector and catalog based on filesystem to support materialized table
 add 713c30f76d5 [FLINK-35026][runtime][config] Introduce async execution 
configurations
 add 3ff2ba43720 [FLINK-35026][runtime] Implement buffer timeout of AEC

No new revisions were added by this update.

Summary of changes:
 .../apache/flink/api/common/ExecutionConfig.java   |  38 +
 .../flink/configuration/ExecutionOptions.java  |  59 +++
 .../asyncprocessing/AsyncExecutionController.java  |  48 +++---
 .../asyncprocessing/StateRequestBuffer.java|  83 +-
 .../AsyncExecutionControllerTest.java  | 182 +
 .../AbstractAsyncStateStreamOperator.java  |  23 ++-
 .../AbstractAsyncStateStreamOperatorV2.java|  18 +-
 .../StreamExecutionEnvironmentTest.java|  30 
 .../InternalTimerServiceAsyncImplTest.java |   2 +-
 9 files changed, 418 insertions(+), 65 deletions(-)



(flink) branch master updated: [hotfix][runtime] Make sure run RecordContext#release() in Task thread. (#24705)

2024-04-25 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 797cb9cd843 [hotfix][runtime] Make sure run RecordContext#release() in 
Task thread. (#24705)
797cb9cd843 is described below

commit 797cb9cd843b1f7c102339019b3dfc45c2fa329d
Author: Wang FeiFan 
AuthorDate: Thu Apr 25 17:59:27 2024 +0800

[hotfix][runtime] Make sure run RecordContext#release() in Task thread. 
(#24705)
---
 .../org/apache/flink/core/state/StateFutureImpl.java   | 18 --
 .../asyncprocessing/ContextStateFutureImpl.java| 10 +++---
 .../flink/runtime/asyncprocessing/RecordContext.java   | 17 ++---
 .../runtime/asyncprocessing/ReferenceCounted.java  | 11 ---
 .../runtime/asyncprocessing/ReferenceCountedTest.java  |  4 ++--
 5 files changed, 43 insertions(+), 17 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java 
b/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java
index d7db0be5311..ed6e5963e75 100644
--- a/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java
+++ b/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java
@@ -65,7 +65,7 @@ public class StateFutureImpl implements 
InternalStateFuture {
 (t) -> {
 callbackRunner.submit(
 () -> {
-ret.complete(fn.apply(t));
+
ret.completeInCallbackRunner(fn.apply(t));
 callbackFinished();
 });
 });
@@ -91,7 +91,7 @@ public class StateFutureImpl implements 
InternalStateFuture {
 callbackRunner.submit(
 () -> {
 action.accept(t);
-ret.complete(null);
+ret.completeInCallbackRunner(null);
 callbackFinished();
 });
 });
@@ -116,7 +116,7 @@ public class StateFutureImpl implements 
InternalStateFuture {
 callbackRunner.submit(
 () -> {
 StateFuture su = action.apply(t);
-su.thenAccept(ret::complete);
+
su.thenAccept(ret::completeInCallbackRunner);
 callbackFinished();
 });
 });
@@ -153,7 +153,8 @@ public class StateFutureImpl implements 
InternalStateFuture {
 (t) -> {
 callbackRunner.submit(
 () -> {
-
ret.complete(fn.apply(t, u));
+
ret.completeInCallbackRunner(
+
fn.apply(t, u));
 callbackFinished();
 });
 });
@@ -178,7 +179,12 @@ public class StateFutureImpl implements 
InternalStateFuture {
 @Override
 public void complete(T result) {
 completableFuture.complete(result);
-postComplete();
+postComplete(false);
+}
+
+private void completeInCallbackRunner(T result) {
+completableFuture.complete(result);
+postComplete(true);
 }
 
 /** Will be triggered when a callback is registered. */
@@ -187,7 +193,7 @@ public class StateFutureImpl implements 
InternalStateFuture {
 }
 
 /** Will be triggered when this future completes. */
-public void postComplete() {
+public void postComplete(boolean inCallbackRunner) {
 // does nothing by default.
 }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImpl.java
index 9355a43a795..cccd815af0e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImpl.java
@@ -61,17 +61,21 @@ public class ContextStateFutureImpl extends 
StateFutureImpl {
 }
 
 @Override
-public void postComple

(flink) branch master updated: [FLINK-35156][Runtime] Make operators of DataStream V2 integrate with async state processing

2024-04-24 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new a87db9333d8 [FLINK-35156][Runtime] Make operators of DataStream V2 
integrate with async state processing
a87db9333d8 is described below

commit a87db9333d80f720b6ba58e73da17d3d788c16c3
Author: Zakelly 
AuthorDate: Thu Apr 18 20:08:11 2024 +0800

[FLINK-35156][Runtime] Make operators of DataStream V2 integrate with async 
state processing
---
 .../datastream/impl/operators/ProcessOperator.java |   4 +-
 .../TwoInputBroadcastProcessOperator.java  |   4 +-
 .../TwoInputNonBroadcastProcessOperator.java   |   4 +-
 .../impl/operators/TwoOutputProcessOperator.java   |   4 +-
 .../AbstractAsyncStateUdfStreamOperator.java   | 168 +
 5 files changed, 176 insertions(+), 8 deletions(-)

diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java
index 6346079cc21..fb773e271e1 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java
@@ -23,15 +23,15 @@ import 
org.apache.flink.datastream.impl.common.OutputCollector;
 import org.apache.flink.datastream.impl.common.TimestampCollector;
 import org.apache.flink.datastream.impl.context.DefaultNonPartitionedContext;
 import org.apache.flink.datastream.impl.context.DefaultRuntimeContext;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import 
org.apache.flink.streaming.runtime.operators.asyncprocessing.AbstractAsyncStateUdfStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 /** Operator for {@link OneInputStreamProcessFunction}. */
 public class ProcessOperator
-extends AbstractUdfStreamOperator>
+extends AbstractAsyncStateUdfStreamOperator>
 implements OneInputStreamOperator, BoundedOneInput {
 
 protected transient DefaultRuntimeContext context;
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java
index 5c0faf05bab..0ba5b986fe5 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java
@@ -23,17 +23,17 @@ import 
org.apache.flink.datastream.impl.common.OutputCollector;
 import org.apache.flink.datastream.impl.common.TimestampCollector;
 import org.apache.flink.datastream.impl.context.DefaultNonPartitionedContext;
 import org.apache.flink.datastream.impl.context.DefaultRuntimeContext;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.BoundedMultiInput;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import 
org.apache.flink.streaming.runtime.operators.asyncprocessing.AbstractAsyncStateUdfStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import static org.apache.flink.util.Preconditions.checkState;
 
 /** Operator for {@link TwoInputBroadcastStreamProcessFunction}. */
 public class TwoInputBroadcastProcessOperator
-extends AbstractUdfStreamOperator<
+extends AbstractAsyncStateUdfStreamOperator<
 OUT, TwoInputBroadcastStreamProcessFunction>
 implements TwoInputStreamOperator, BoundedMultiInput {
 
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java
index 60d88682c2d..e4f02a10dcb 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java
@@ -23,17 +23,17 @@ import 
org.apache.flink.datastream.impl.common.OutputCollector;
 import org.apache.flink.datastream.impl.common.TimestampCollector;
 import org.apache.flink.datastream.impl.context.DefaultNonPartitionedC

(flink) branch master updated (ea2eabe7da5 -> 8475d284f2a)

2024-04-23 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from ea2eabe7da5 [FLINK-35214][runtime] Update result partition id for 
remote input channel when unknown input channel is updated
 add 8475d284f2a [FLINK-35158][Core/API] Move functional interfaces about 
consumers from core module to core-api

No new revisions were added by this update.

Summary of changes:
 .../util/function/BiConsumerWithException.java |  4 +--
 .../util/function/BiFunctionWithException.java |  4 +--
 .../flink/util/function/CheckedSupplier.java   |  4 +--
 .../flink/util/function/FunctionWithException.java |  0
 .../util/function/LongFunctionWithException.java   |  0
 .../apache/flink/util/function/QuadConsumer.java   |  0
 .../apache/flink/util/function/QuadFunction.java   |  0
 .../flink/util/function/RunnableWithException.java |  0
 .../flink/util/function/SerializableFunction.java  |  0
 .../flink/util/function/SerializableSupplier.java  |  0
 .../SerializableSupplierWithException.java |  0
 .../flink/util/function/SupplierWithException.java |  0
 .../flink/util/function/ThrowingConsumer.java  |  0
 .../util/function/ThrowingExceptionUtils.java  | 32 --
 .../flink/util/function/ThrowingRunnable.java  |  3 +-
 .../apache/flink/util/function/TriConsumer.java|  0
 .../util/function/TriConsumerWithException.java|  4 +--
 .../apache/flink/util/function/TriFunction.java|  0
 .../util/function/TriFunctionWithException.java|  3 +-
 pom.xml|  6 
 20 files changed, 29 insertions(+), 31 deletions(-)
 rename {flink-core => 
flink-core-api}/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java
 (95%)
 rename {flink-core => 
flink-core-api}/src/main/java/org/apache/flink/util/function/BiFunctionWithException.java
 (96%)
 rename {flink-core => 
flink-core-api}/src/main/java/org/apache/flink/util/function/CheckedSupplier.java
 (94%)
 rename {flink-core => 
flink-core-api}/src/main/java/org/apache/flink/util/function/FunctionWithException.java
 (100%)
 rename {flink-core => 
flink-core-api}/src/main/java/org/apache/flink/util/function/LongFunctionWithException.java
 (100%)
 rename {flink-core => 
flink-core-api}/src/main/java/org/apache/flink/util/function/QuadConsumer.java 
(100%)
 rename {flink-core => 
flink-core-api}/src/main/java/org/apache/flink/util/function/QuadFunction.java 
(100%)
 rename {flink-core => 
flink-core-api}/src/main/java/org/apache/flink/util/function/RunnableWithException.java
 (100%)
 rename {flink-core => 
flink-core-api}/src/main/java/org/apache/flink/util/function/SerializableFunction.java
 (100%)
 rename {flink-core => 
flink-core-api}/src/main/java/org/apache/flink/util/function/SerializableSupplier.java
 (100%)
 rename {flink-core => 
flink-core-api}/src/main/java/org/apache/flink/util/function/SerializableSupplierWithException.java
 (100%)
 copy {flink-core => 
flink-core-api}/src/main/java/org/apache/flink/util/function/SupplierWithException.java
 (100%)
 rename {flink-core => 
flink-core-api}/src/main/java/org/apache/flink/util/function/ThrowingConsumer.java
 (100%)
 rename 
flink-core/src/main/java/org/apache/flink/util/function/SupplierWithException.java
 => 
flink-core-api/src/main/java/org/apache/flink/util/function/ThrowingExceptionUtils.java
 (56%)
 rename {flink-core => 
flink-core-api}/src/main/java/org/apache/flink/util/function/ThrowingRunnable.java
 (95%)
 rename {flink-core => 
flink-core-api}/src/main/java/org/apache/flink/util/function/TriConsumer.java 
(100%)
 rename {flink-core => 
flink-core-api}/src/main/java/org/apache/flink/util/function/TriConsumerWithException.java
 (96%)
 rename {flink-core => 
flink-core-api}/src/main/java/org/apache/flink/util/function/TriFunction.java 
(100%)
 rename {flink-core => 
flink-core-api}/src/main/java/org/apache/flink/util/function/TriFunctionWithException.java
 (96%)



(flink) branch master updated (51d015b5704 -> 0b2e9880354)

2024-04-22 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 51d015b5704 [FLINK-35097][table] Use var-args in 
RawFormatSerDeSchemaTest
 add 0b2e9880354 [FLINK-35028][runtime] Timer firing under async execution 
model

No new revisions were added by this update.

Summary of changes:
 .../asyncprocessing/AsyncExecutionController.java  |   5 +-
 .../runtime/asyncprocessing/KeyAccountingUnit.java |   1 -
 .../runtime/asyncprocessing/RecordContext.java |   3 +-
 .../api/operators/AbstractStreamOperator.java  |   2 +-
 .../api/operators/AbstractStreamOperatorV2.java|   3 +-
 .../api/operators/InternalTimeServiceManager.java  |  16 ++
 .../operators/InternalTimeServiceManagerImpl.java  |  49 +
 .../operators/InternalTimerServiceAsyncImpl.java   | 137 +
 .../api/operators/InternalTimerServiceImpl.java|  22 +-
 .../BatchExecutionInternalTimeServiceManager.java  |  12 ++
 .../AbstractAsyncStateStreamOperator.java  |  54 +
 .../AbstractAsyncStateStreamOperatorV2.java|  31 +++
 .../InternalTimerServiceAsyncImplTest.java | 221 +
 .../AbstractAsyncStateStreamOperatorTest.java  |  26 +++
 .../AbstractAsyncStateStreamOperatorV2Test.java|  24 +++
 15 files changed, 589 insertions(+), 17 deletions(-)
 create mode 100644 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImpl.java
 create mode 100644 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImplTest.java



(flink) branch master updated: [FLINK-35027][runtime/checkpoint] Implement checkpoint drain in AsyncExecutionController

2024-04-21 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 9336760ace7 [FLINK-35027][runtime/checkpoint] Implement checkpoint 
drain in AsyncExecutionController
9336760ace7 is described below

commit 9336760ace73a0c84b949370eb773d71f5e3ac1f
Author: fredia 
AuthorDate: Thu Apr 18 12:32:30 2024 +0800

[FLINK-35027][runtime/checkpoint] Implement checkpoint drain in 
AsyncExecutionController
---
 .../asyncprocessing/AsyncExecutionController.java  | 45 ++-
 .../runtime/asyncprocessing/StateRequest.java  |  2 +-
 .../api/operators/AbstractStreamOperator.java  |  2 +-
 .../api/operators/AbstractStreamOperatorV2.java|  2 +-
 .../AbstractAsyncStateStreamOperator.java  | 19 
 .../AbstractAsyncStateStreamOperatorV2.java| 19 
 .../AbstractAsyncStateStreamOperatorTest.java  | 51 ++
 .../AbstractAsyncStateStreamOperatorV2Test.java| 49 +
 8 files changed, 174 insertions(+), 15 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
index 1964ec3f378..cfa0e4f0d0e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.asyncprocessing;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.api.common.state.v2.State;
 import org.apache.flink.core.state.InternalStateFuture;
@@ -80,7 +81,7 @@ public class AsyncExecutionController {
 private final StateFutureFactory stateFutureFactory;
 
 /** The state executor where the {@link StateRequest} is actually 
executed. */
-final StateExecutor stateExecutor;
+StateExecutor stateExecutor;
 
 /** The corresponding context that currently runs in task thread. */
 RecordContext currentContext;
@@ -235,17 +236,7 @@ public class AsyncExecutionController {
 // 2. If the state request is for a newly entered record, the 
in-flight record number should
 // be less than the max in-flight record number.
 // Note: the currentContext may be updated by {@code 
StateFutureFactory#build}.
-try {
-while (inFlightRecordNum.get() > maxInFlightRecordNum) {
-if (!mailboxExecutor.tryYield()) {
-triggerIfNeeded(true);
-Thread.sleep(1);
-}
-}
-} catch (InterruptedException ignored) {
-// ignore the interrupted exception to avoid throwing fatal error 
when the task cancel
-// or exit.
-}
+drainInflightRecords(maxInFlightRecordNum);
 // 3. Ensure the currentContext is restored.
 setCurrentContext(storedContext);
 inFlightRecordNum.incrementAndGet();
@@ -269,4 +260,34 @@ public class AsyncExecutionController {
 }
 });
 }
+
+/**
+ * A helper function to drain in-flight records util {@link 
#inFlightRecordNum} within the limit
+ * of given {@code targetNum}.
+ *
+ * @param targetNum the target {@link #inFlightRecordNum} to achieve.
+ */
+public void drainInflightRecords(int targetNum) {
+try {
+while (inFlightRecordNum.get() > targetNum) {
+if (!mailboxExecutor.tryYield()) {
+triggerIfNeeded(true);
+Thread.sleep(1);
+}
+}
+} catch (InterruptedException ignored) {
+// ignore the interrupted exception to avoid throwing fatal error 
when the task cancel
+// or exit.
+}
+}
+
+@VisibleForTesting
+public void setStateExecutor(StateExecutor stateExecutor) {
+this.stateExecutor = stateExecutor;
+}
+
+@VisibleForTesting
+public int getInFlightRecordNum() {
+return inFlightRecordNum.get();
+}
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java
index aa050cf6557..59009361751 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java
@@ -78,7 +78,7 @@ public class StateRequest implements Serializable 
{
 return state;
 }
 
-InternalStateFuture getFuture() {
+

(flink) 02/03: [FLINK-35025][Runtime/State] Abstract stream operators for async state processing

2024-04-17 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e1edb24ecc2e28987803ded3b460323c0179a052
Author: Zakelly 
AuthorDate: Fri Apr 12 21:52:29 2024 +0800

[FLINK-35025][Runtime/State] Abstract stream operators for async state 
processing
---
 .../streaming/api/operators/AbstractInput.java |  23 +++-
 .../api/operators/AbstractStreamOperator.java  |   4 +-
 .../api/operators/AbstractStreamOperatorV2.java|   2 +-
 .../streaming/runtime/io/RecordProcessorUtils.java |  10 ++
 .../AbstractAsyncStateStreamOperator.java  | 133 
 .../AbstractAsyncStateStreamOperatorV2.java| 108 
 .../asyncprocessing/AsyncStateProcessing.java  |  65 ++
 .../AsyncStateProcessingOperator.java  |  44 +++
 .../AbstractAsyncStateStreamOperatorTest.java  |  98 +++
 .../AbstractAsyncStateStreamOperatorV2Test.java| 140 +
 .../multipleinput/input/FirstInputOfTwoInput.java  |  18 ++-
 .../operators/multipleinput/input/OneInput.java|  18 ++-
 .../multipleinput/input/SecondInputOfTwoInput.java |  18 ++-
 13 files changed, 674 insertions(+), 7 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java
index 9982d12c8e8..20c3006d838 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java
@@ -19,12 +19,16 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessing;
+import 
org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessingOperator;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
+import org.apache.flink.util.function.ThrowingConsumer;
 
 import javax.annotation.Nullable;
 
@@ -35,7 +39,8 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
  * AbstractStreamOperatorV2}.
  */
 @Experimental
-public abstract class AbstractInput implements Input, 
KeyContextHandler {
+public abstract class AbstractInput
+implements Input, KeyContextHandler, AsyncStateProcessing {
 /**
  * {@code KeySelector} for extracting a key from an element being 
processed. This is used to
  * scope keyed state to a key. This is null if the operator is not a keyed 
operator.
@@ -86,4 +91,20 @@ public abstract class AbstractInput implements 
Input, KeyContextHan
 public boolean hasKeyContext() {
 return stateKeySelector != null;
 }
+
+@Internal
+@Override
+public final boolean isAsyncStateProcessingEnabled() {
+return (owner instanceof AsyncStateProcessingOperator)
+&& ((AsyncStateProcessingOperator) 
owner).isAsyncStateProcessingEnabled();
+}
+
+@Internal
+@Override
+public final ThrowingConsumer, Exception> 
getRecordProcessor(int inputId) {
+return AsyncStateProcessing.makeRecordProcessor(
+(AsyncStateProcessingOperator) owner,
+(KeySelector) stateKeySelector,
+this::processElement);
+}
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 632b89b16ae..76fef0c26cf 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -128,7 +128,7 @@ public abstract class AbstractStreamOperator
  *
  * This is for elements from the first input.
  */
-private transient KeySelector stateKeySelector1;
+protected transient KeySelector stateKeySelector1;
 
 /**
  * {@code KeySelector} for extracting a key from an element being 
processed. This is used to
@@ -136,7 +136,7 @@ public abstract class AbstractStreamOperator
  *
  * This is for elements from the second input.
  */
-private transient KeySelector stateKeySelector2;
+protected transient KeySelector stateKeySelector2;
 
 privat

(flink) branch master updated: [FLINK-34936][Checkpointing] Register reused shared state handle to FileMergingSnapshotManager

2024-04-17 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 31ea1a9358f [FLINK-34936][Checkpointing] Register reused shared state 
handle to FileMergingSnapshotManager
31ea1a9358f is described below

commit 31ea1a9358f4aaa1083c6fbb7bdee070be29f4e6
Author: Zakelly 
AuthorDate: Tue Apr 9 20:16:31 2024 +0800

[FLINK-34936][Checkpointing] Register reused shared state handle to 
FileMergingSnapshotManager
---
 .../filemerging/FileMergingSnapshotManager.java| 12 +
 .../FileMergingSnapshotManagerBase.java| 38 +++--
 .../metadata/MetadataV2V3SerializerBase.java   |  6 ++-
 .../runtime/state/CheckpointStreamFactory.java | 12 +
 .../filemerging/EmptySegmentFileStateHandle.java   | 15 --
 .../state/filemerging/SegmentFileStateHandle.java  | 21 ++--
 .../FsMergingCheckpointStorageLocation.java|  6 +++
 .../FileMergingSnapshotManagerTestBase.java| 62 ++
 .../checkpoint/metadata/CheckpointTestUtils.java   |  8 ++-
 ...FileMergingCheckpointStateOutputStreamTest.java |  8 ++-
 .../snapshot/RocksIncrementalSnapshotStrategy.java | 15 +-
 11 files changed, 189 insertions(+), 14 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
index afc5eb618dc..2aa32ba65c0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
@@ -24,12 +24,14 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.state.filemerging.DirectoryStreamStateHandle;
 import 
org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream;
 import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageAccess;
 
 import java.io.Closeable;
+import java.util.Collection;
 
 /**
  * FileMergingSnapshotManager provides an interface to manage files and meta 
information for
@@ -157,6 +159,16 @@ public interface FileMergingSnapshotManager extends 
Closeable {
  */
 void notifyCheckpointSubsumed(SubtaskKey subtaskKey, long checkpointId) 
throws Exception;
 
+/**
+ * A callback method which is called when previous state handles are 
reused by following
+ * checkpoint(s).
+ *
+ * @param checkpointId the checkpoint that reuses the handles.
+ * @param stateHandles the handles to be reused.
+ */
+void reusePreviousStateHandle(
+long checkpointId, Collection 
stateHandles);
+
 /**
  * A key identifies a subtask. A subtask can be identified by the operator 
id, subtask index and
  * the parallelism. Note that this key should be consistent across job 
attempts.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java
index d46edcd235e..db90d654d61 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java
@@ -27,6 +27,7 @@ import org.apache.flink.core.fs.OutputStreamAndPath;
 import org.apache.flink.core.fs.Path;
 import 
org.apache.flink.runtime.checkpoint.filemerging.LogicalFile.LogicalFileId;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filemerging.DirectoryStreamStateHandle;
 import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle;
 import 
org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream;
@@ -42,6 +43,7 @@ import javax.annotation.concurrent.GuardedBy;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
@@ -70,6 +72,9 @@ public abstract class FileMergingSnapshotManagerBase 
implements FileMergingSnaps
 @GuardedBy("lock")
 protected TreeMap> uploadedStates = new TreeMap<>();
 
+/** The map that holds all the known live logical files. */
+  

(flink) 03/03: [FLINK-35025][Runtime/State] Introduce element order for async state processing

2024-04-17 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fe8dde4ea489f466982ba896c1e2b22a7466e719
Author: Zakelly 
AuthorDate: Mon Apr 15 20:53:47 2024 +0800

[FLINK-35025][Runtime/State] Introduce element order for async state 
processing
---
 .../asyncprocessing/AsyncExecutionController.java  |  21 
 .../asyncprocessing/StateRequestBuffer.java|   8 +-
 .../AsyncExecutionControllerTest.java  |  44 +++
 .../AbstractAsyncStateStreamOperator.java  |  42 ---
 .../AbstractAsyncStateStreamOperatorV2.java|  17 +++
 .../asyncprocessing/AsyncStateProcessing.java  |  23 +++-
 .../AsyncStateProcessingOperator.java  |  12 ++
 ...teProcessingOperator.java => ElementOrder.java} |  28 +++--
 .../AbstractAsyncStateStreamOperatorTest.java  | 108 -
 .../AbstractAsyncStateStreamOperatorV2Test.java| 130 +++--
 10 files changed, 380 insertions(+), 53 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
index 8dd8d16e721..1964ec3f378 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
@@ -21,7 +21,9 @@ package org.apache.flink.runtime.asyncprocessing;
 import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.api.common.state.v2.State;
 import org.apache.flink.core.state.InternalStateFuture;
+import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.ThrowingRunnable;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -248,4 +250,23 @@ public class AsyncExecutionController {
 setCurrentContext(storedContext);
 inFlightRecordNum.incrementAndGet();
 }
+
+/**
+ * A helper to request a {@link StateRequestType#SYNC_POINT} and run a 
callback if it finishes
+ * (once the record is not blocked).
+ *
+ * @param callback the callback to run if it finishes (once the record is 
not blocked).
+ */
+public void syncPointRequestWithCallback(ThrowingRunnable 
callback) {
+handleRequest(null, StateRequestType.SYNC_POINT, null)
+.thenAccept(
+v -> {
+try {
+callback.run();
+} catch (Exception e) {
+// TODO: Properly handle the exception and 
fail the entire job.
+throw new FlinkRuntimeException("Unexpected 
runtime exception", e);
+}
+});
+}
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java
index 0eea7bbcacc..9fa10bfee5b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java
@@ -60,7 +60,11 @@ public class StateRequestBuffer {
 }
 
 void enqueueToActive(StateRequest request) {
-activeQueue.add(request);
+if (request.getRequestType() == StateRequestType.SYNC_POINT) {
+request.getFuture().complete(null);
+} else {
+activeQueue.add(request);
+}
 }
 
 void enqueueToBlocking(StateRequest request) {
@@ -83,7 +87,7 @@ public class StateRequestBuffer {
 }
 
 StateRequest stateRequest = 
blockingQueue.get(key).removeFirst();
-activeQueue.add(stateRequest);
+enqueueToActive(stateRequest);
 if (blockingQueue.get(key).isEmpty()) {
 blockingQueue.remove(key);
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
index 5d661958fc4..d36a5e53ae6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
@@ -300,6 +300,50 @@ class AsyncExecutionControllerTest {
 }
 }
 
+@Test
+public void testSyncPoint() {
+AtomicInteger counter = new AtomicInteger(0);
+
+// Test the sync point processing without a key occupied.
+RecordContext recordContext = aec.buil

(flink) branch master updated (31ea1a9358f -> fe8dde4ea48)

2024-04-17 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 31ea1a9358f [FLINK-34936][Checkpointing] Register reused shared state 
handle to FileMergingSnapshotManager
 new c7be45d0ba9 [FLINK-35025][Runtime/State] Erase the type parameter of 
record within AEC and related components.
 new e1edb24ecc2 [FLINK-35025][Runtime/State] Abstract stream operators for 
async state processing
 new fe8dde4ea48 [FLINK-35025][Runtime/State] Introduce element order for 
async state processing

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../asyncprocessing/AsyncExecutionController.java  |  44 +++-
 .../asyncprocessing/ContextStateFutureImpl.java|   4 +-
 .../runtime/asyncprocessing/KeyAccountingUnit.java |   9 +-
 .../runtime/asyncprocessing/RecordContext.java |  14 +-
 .../asyncprocessing/StateFutureFactory.java|   9 +-
 .../runtime/asyncprocessing/StateRequest.java  |   6 +-
 .../asyncprocessing/StateRequestBuffer.java|  29 ++-
 .../AsyncExecutionControllerTest.java  |  68 +-
 .../ContextStateFutureImplTest.java|  10 +-
 .../asyncprocessing/KeyAccountingUnitTest.java |   2 +-
 .../streaming/api/operators/AbstractInput.java |  23 +-
 .../api/operators/AbstractStreamOperator.java  |   4 +-
 .../api/operators/AbstractStreamOperatorV2.java|   2 +-
 .../streaming/runtime/io/RecordProcessorUtils.java |  10 +
 .../AbstractAsyncStateStreamOperator.java  | 145 
 .../AbstractAsyncStateStreamOperatorV2.java| 125 +++
 .../asyncprocessing/AsyncStateProcessing.java  |  78 +++
 .../AsyncStateProcessingOperator.java  |  56 +
 .../operators/asyncprocessing/ElementOrder.java|  42 
 .../AbstractAsyncStateStreamOperatorTest.java  | 196 
 .../AbstractAsyncStateStreamOperatorV2Test.java| 248 +
 .../multipleinput/input/FirstInputOfTwoInput.java  |  18 +-
 .../operators/multipleinput/input/OneInput.java|  18 +-
 .../multipleinput/input/SecondInputOfTwoInput.java |  18 +-
 24 files changed, 1108 insertions(+), 70 deletions(-)
 create mode 100644 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java
 create mode 100644 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java
 create mode 100644 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AsyncStateProcessing.java
 create mode 100644 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AsyncStateProcessingOperator.java
 create mode 100644 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/ElementOrder.java
 create mode 100644 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorTest.java
 create mode 100644 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2Test.java



(flink) 01/03: [FLINK-35025][Runtime/State] Erase the type parameter of record within AEC and related components.

2024-04-17 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c7be45d0ba9c4ab6c2ce7757b6c8cd8e1f0fe539
Author: Zakelly 
AuthorDate: Fri Apr 12 18:34:07 2024 +0800

[FLINK-35025][Runtime/State] Erase the type parameter of record within AEC 
and related components.
---
 .../asyncprocessing/AsyncExecutionController.java  | 23 ++---
 .../asyncprocessing/ContextStateFutureImpl.java|  4 ++--
 .../runtime/asyncprocessing/KeyAccountingUnit.java |  9 
 .../runtime/asyncprocessing/RecordContext.java | 14 ++---
 .../asyncprocessing/StateFutureFactory.java|  9 
 .../runtime/asyncprocessing/StateRequest.java  |  6 +++---
 .../asyncprocessing/StateRequestBuffer.java| 21 +++
 .../AsyncExecutionControllerTest.java  | 24 +++---
 .../ContextStateFutureImplTest.java| 10 -
 .../asyncprocessing/KeyAccountingUnitTest.java |  2 +-
 10 files changed, 61 insertions(+), 61 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
index cf8304a71ea..8dd8d16e721 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
@@ -42,10 +42,9 @@ import java.util.concurrent.atomic.AtomicInteger;
  * (back-pressure). It invokes {@link MailboxExecutor#yield()} to pause 
current operations,
  * allowing for the execution of callbacks (mails in Mailbox).
  *
- * @param  the type of the record
  * @param  the type of the key
  */
-public class AsyncExecutionController {
+public class AsyncExecutionController {
 
 private static final Logger LOG = 
LoggerFactory.getLogger(AsyncExecutionController.class);
 
@@ -70,22 +69,22 @@ public class AsyncExecutionController {
 private final MailboxExecutor mailboxExecutor;
 
 /** The key accounting unit which is used to detect the key conflict. */
-final KeyAccountingUnit keyAccountingUnit;
+final KeyAccountingUnit keyAccountingUnit;
 
 /**
  * A factory to build {@link 
org.apache.flink.core.state.InternalStateFuture}, this will auto
  * wire the created future with mailbox executor. Also conducting the 
context switch.
  */
-private final StateFutureFactory stateFutureFactory;
+private final StateFutureFactory stateFutureFactory;
 
 /** The state executor where the {@link StateRequest} is actually 
executed. */
 final StateExecutor stateExecutor;
 
 /** The corresponding context that currently runs in task thread. */
-RecordContext currentContext;
+RecordContext currentContext;
 
 /** The buffer to store the state requests to execute in batch. */
-StateRequestBuffer stateRequestsBuffer;
+StateRequestBuffer stateRequestsBuffer;
 
 /**
  * The number of in-flight records. Including the records in active buffer 
and blocking buffer.
@@ -123,7 +122,7 @@ public class AsyncExecutionController {
  * @param key the given key.
  * @return the built record context.
  */
-public RecordContext buildContext(R record, K key) {
+public RecordContext buildContext(Object record, K key) {
 return new RecordContext<>(record, key, this::disposeContext);
 }
 
@@ -133,7 +132,7 @@ public class AsyncExecutionController {
  *
  * @param switchingContext the context to switch.
  */
-public void setCurrentContext(RecordContext switchingContext) {
+public void setCurrentContext(RecordContext switchingContext) {
 currentContext = switchingContext;
 }
 
@@ -142,10 +141,10 @@ public class AsyncExecutionController {
  *
  * @param toDispose the context to dispose.
  */
-public void disposeContext(RecordContext toDispose) {
+public void disposeContext(RecordContext toDispose) {
 keyAccountingUnit.release(toDispose.getRecord(), toDispose.getKey());
 inFlightRecordNum.decrementAndGet();
-RecordContext nextRecordCtx =
+RecordContext nextRecordCtx =
 stateRequestsBuffer.tryActivateOneByKey(toDispose.getKey());
 if (nextRecordCtx != null) {
 Preconditions.checkState(
@@ -160,7 +159,7 @@ public class AsyncExecutionController {
  * @param recordContext the given context.
  * @return true if occupy succeed or the key has already occupied by this 
context.
  */
-boolean tryOccupyKey(RecordContext recordContext) {
+boolean tryOccupyKey(RecordContext recordContext) {
 boolean occupied = recordContext.isKeyOccupied();
 if (!occupied
 && keyAccount

(flink) branch master updated: [FLINK-32440][checkpoint] Introduce file merging configuration (#22973)

2024-04-14 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new c601c70e5c4 [FLINK-32440][checkpoint] Introduce file merging 
configuration (#22973)
c601c70e5c4 is described below

commit c601c70e5c4a9316bee049c613c011b14fab7f5e
Author: Yanfei Lei 
AuthorDate: Mon Apr 15 14:21:40 2024 +0800

[FLINK-32440][checkpoint] Introduce file merging configuration (#22973)
---
 .../generated/checkpoint_file_merging_section.html |  36 +++
 .../generated/checkpointing_configuration.html |  24 +
 .../flink/annotation/docs/Documentation.java   |   2 +
 .../flink/configuration/CheckpointingOptions.java  | 107 +
 .../FileMergingSnapshotManagerBuilder.java |   4 +-
 .../runtime/state/CheckpointStorageWorkerView.java |   2 +-
 .../state/TaskExecutorFileMergingManager.java  |  45 -
 .../flink/runtime/taskexecutor/TaskExecutor.java   |   5 +-
 .../state/TaskExecutorFileMergingManagerTest.java  |  14 ++-
 .../flink/streaming/runtime/tasks/StreamTask.java  |  25 -
 10 files changed, 248 insertions(+), 16 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/checkpoint_file_merging_section.html 
b/docs/layouts/shortcodes/generated/checkpoint_file_merging_section.html
new file mode 100644
index 000..50ee139e0ee
--- /dev/null
+++ b/docs/layouts/shortcodes/generated/checkpoint_file_merging_section.html
@@ -0,0 +1,36 @@
+
+
+
+Key
+Default
+Type
+Description
+
+
+
+
+state.checkpoints.file-merging.enabled
+false
+Boolean
+Whether to enable merging multiple checkpoint files into one, 
which will greatly reduce the number of small checkpoint files. This is an 
experimental feature under evaluation, make sure you're aware of the possible 
effects of enabling it.
+
+
+
state.checkpoints.file-merging.across-checkpoint-boundary
+false
+Boolean
+Only relevant if state.checkpoints.file-merging.enabled is 
enabled.Whether to allow merging data of multiple checkpoints into one 
physical file. If this option is set to false, only merge files within 
checkpoint boundaries. Otherwise, it is possible for the logical files of 
different checkpoints to share the same physical file.
+
+
+state.checkpoints.file-merging.max-file-size
+32 mb
+MemorySize
+Max size of a physical file for merged checkpoints.
+
+
+state.checkpoints.file-merging.pool-blocking
+false
+Boolean
+Whether to use Blocking or Non-Blocking pool for merging 
physical files. A Non-Blocking pool will always provide usable physical file 
without blocking. It may create many physical files if poll file frequently. 
When poll a small file from a Blocking pool, it may be blocked until the file 
is returned.
+
+
+
diff --git a/docs/layouts/shortcodes/generated/checkpointing_configuration.html 
b/docs/layouts/shortcodes/generated/checkpointing_configuration.html
index c87a9c33803..3b9fda388e4 100644
--- a/docs/layouts/shortcodes/generated/checkpointing_configuration.html
+++ b/docs/layouts/shortcodes/generated/checkpointing_configuration.html
@@ -44,6 +44,30 @@
 String
 The default directory used for storing the data files and meta 
data of checkpoints in a Flink supported filesystem. The storage path must be 
accessible from all participating processes/nodes(i.e. all TaskManagers and 
JobManagers). If the 'state.checkpoint-storage' is set to 'jobmanager', only 
the meta data of checkpoints will be stored in this directory.
 
+
+
state.checkpoints.file-merging.across-checkpoint-boundary
+false
+Boolean
+Only relevant if state.checkpoints.file-merging.enabled is 
enabled.Whether to allow merging data of multiple checkpoints into one 
physical file. If this option is set to false, only merge files within 
checkpoint boundaries. Otherwise, it is possible for the logical files of 
different checkpoints to share the same physical file.
+
+
+state.checkpoints.file-merging.enabled
+false
+Boolean
+Whether to enable merging multiple checkpoint files into one, 
which will greatly reduce the number of small checkpoint files. This is an 
experimental feature under evaluation, make sure you're aware of the possible 
effects of enabling it.
+
+
+state.checkpoints.file-merging.max-file-size
+32 mb
+MemorySize
+Max size of a physic

(flink) branch master updated: [FLINK-35024][Runtime/State] Implement the record buffer of AsyncExecutionController (#24633)

2024-04-14 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 6d139a19c80 [FLINK-35024][Runtime/State] Implement the record buffer 
of AsyncExecutionController (#24633)
6d139a19c80 is described below

commit 6d139a19c809f787317c5afa4e56e1c544125e5f
Author: Yanfei Lei 
AuthorDate: Mon Apr 15 13:56:53 2024 +0800

[FLINK-35024][Runtime/State] Implement the record buffer of 
AsyncExecutionController (#24633)
---
 .../asyncprocessing/AsyncExecutionController.java  |  98 +++-
 .../asyncprocessing/StateRequestBuffer.java| 125 ++
 .../AsyncExecutionControllerTest.java  | 259 ++---
 3 files changed, 385 insertions(+), 97 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
index ffecc5f9687..cf8304a71ea 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
@@ -21,12 +21,16 @@ package org.apache.flink.runtime.asyncprocessing;
 import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.api.common.state.v2.State;
 import org.apache.flink.core.state.InternalStateFuture;
+import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
 /**
  * The Async Execution Controller (AEC) receives processing requests from 
operators, and put them
  * into execution according to some strategies.
@@ -45,11 +49,26 @@ public class AsyncExecutionController {
 
 private static final Logger LOG = 
LoggerFactory.getLogger(AsyncExecutionController.class);
 
+public static final int DEFAULT_BATCH_SIZE = 1000;
 public static final int DEFAULT_MAX_IN_FLIGHT_RECORD_NUM = 6000;
 
+/**
+ * The batch size. When the number of state requests in the active buffer 
exceeds the batch
+ * size, a batched state execution would be triggered.
+ */
+private final int batchSize;
+
 /** The max allowed number of in-flight records. */
 private final int maxInFlightRecordNum;
 
+/**
+ * The mailbox executor borrowed from {@code StreamTask}. Keeping the 
reference of
+ * mailboxExecutor here is to restrict the number of in-flight records, 
when the number of
+ * in-flight records > {@link #maxInFlightRecordNum}, the newly entering 
records would be
+ * blocked.
+ */
+private final MailboxExecutor mailboxExecutor;
+
 /** The key accounting unit which is used to detect the key conflict. */
 final KeyAccountingUnit keyAccountingUnit;
 
@@ -65,17 +84,35 @@ public class AsyncExecutionController {
 /** The corresponding context that currently runs in task thread. */
 RecordContext currentContext;
 
+/** The buffer to store the state requests to execute in batch. */
+StateRequestBuffer stateRequestsBuffer;
+
+/**
+ * The number of in-flight records. Including the records in active buffer 
and blocking buffer.
+ */
+final AtomicInteger inFlightRecordNum;
+
 public AsyncExecutionController(MailboxExecutor mailboxExecutor, 
StateExecutor stateExecutor) {
-this(mailboxExecutor, stateExecutor, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
+this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, 
DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
 }
 
 public AsyncExecutionController(
-MailboxExecutor mailboxExecutor, StateExecutor stateExecutor, int 
maxInFlightRecords) {
+MailboxExecutor mailboxExecutor,
+StateExecutor stateExecutor,
+int batchSize,
+int maxInFlightRecords) {
 this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
+this.mailboxExecutor = mailboxExecutor;
 this.stateFutureFactory = new StateFutureFactory<>(this, 
mailboxExecutor);
 this.stateExecutor = stateExecutor;
+this.batchSize = batchSize;
 this.maxInFlightRecordNum = maxInFlightRecords;
-LOG.info("Create AsyncExecutionController: maxInFlightRecordsNum {}", 
maxInFlightRecords);
+this.stateRequestsBuffer = new StateRequestBuffer<>();
+this.inFlightRecordNum = new AtomicInteger(0);
+LOG.info(
+"Create AsyncExecutionController: batchSize {}, 
maxInFlightRecordsNum {}",
+batchSize,
+maxInFlightRecords);
 }
 
 /**
@@ -107,6 +144,14 @@ public class AsyncExecutionController {
  */

(flink) 02/03: [FLINK-34986][Runtime/State] Basic implementation of AEC, RecordContext and reference counting

2024-04-10 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 294084872604d599c1e2c52302ce0f08bb0e6492
Author: Zakelly 
AuthorDate: Wed Apr 3 18:39:17 2024 +0800

[FLINK-34986][Runtime/State] Basic implementation of AEC, RecordContext and 
reference counting
---
 .../flink/core/state/CompletedStateFuture.java |   5 +
 .../flink/core/state/InternalStateFuture.java  |   3 +
 .../apache/flink/core/state/StateFutureImpl.java   |  48 +++-
 .../asyncprocessing/AsyncExecutionController.java  | 170 
 .../asyncprocessing/ContextStateFutureImpl.java|  75 +
 .../KeyAccountingUnit.java |  51 ++--
 .../RecordContext.java |  57 +++-
 .../ReferenceCounted.java  |  19 +-
 .../runtime/asyncprocessing/StateExecutor.java |  21 +-
 .../asyncprocessing/StateFutureFactory.java|  51 
 .../runtime/asyncprocessing/StateRequest.java  |  88 ++
 .../runtime/asyncprocessing/StateRequestType.java  | 103 +++
 .../taskprocessing/AsyncExecutionController.java   |  80 --
 .../runtime/taskprocessing/OrderPreserveMode.java  |  38 ---
 .../runtime/taskprocessing/ProcessingRequest.java  |  69 -
 .../runtime/taskprocessing/StateExecutor.java  |  19 --
 .../AsyncExecutionControllerTest.java  | 301 +
 .../KeyAccountingUnitTest.java}|  21 +-
 .../ReferenceCountedTest.java  |  11 +-
 .../AsyncExecutionControllerTest.java  | 280 ---
 20 files changed, 938 insertions(+), 572 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/core/state/CompletedStateFuture.java
 
b/flink-core/src/main/java/org/apache/flink/core/state/CompletedStateFuture.java
index 615b8e78c32..13b7de90cee 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/state/CompletedStateFuture.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/state/CompletedStateFuture.java
@@ -67,6 +67,11 @@ public class CompletedStateFuture implements 
InternalStateFuture {
 });
 }
 
+@Override
+public void complete(T result) {
+throw new UnsupportedOperationException("This state future has already 
been completed.");
+}
+
 @Override
 public void thenSyncAccept(Consumer action) {
 action.accept(result);
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/state/InternalStateFuture.java 
b/flink-core/src/main/java/org/apache/flink/core/state/InternalStateFuture.java
index 7fd6d7485ff..bbbd4a94f71 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/state/InternalStateFuture.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/state/InternalStateFuture.java
@@ -29,6 +29,9 @@ import java.util.function.Consumer;
 @Internal
 public interface InternalStateFuture extends StateFuture {
 
+/** Complete this future. */
+void complete(T result);
+
 /**
  * Accept the action in the same thread with the one of complete (or 
current thread if it has
  * been completed).
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java 
b/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java
index dfae92c845f..63de46cc91e 100644
--- a/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java
+++ b/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java
@@ -40,11 +40,11 @@ import java.util.function.Function;
 @Internal
 public class StateFutureImpl implements InternalStateFuture {
 
-/** The future holds the result. The completes in async threads. */
-CompletableFuture completableFuture;
+/** The future holds the result. This may complete in async threads. */
+private final CompletableFuture completableFuture;
 
 /** The callback runner. */
-CallbackRunner callbackRunner;
+protected final CallbackRunner callbackRunner;
 
 public StateFutureImpl(CallbackRunner callbackRunner) {
 this.completableFuture = new CompletableFuture<>();
@@ -53,17 +53,20 @@ public class StateFutureImpl implements 
InternalStateFuture {
 
 @Override
 public  StateFuture thenApply(Function fn) {
+callbackRegistered();
 try {
 if (completableFuture.isDone()) {
 U r = fn.apply(completableFuture.get());
+callbackFinished();
 return StateFutureUtils.completedFuture(r);
 } else {
-StateFutureImpl ret = new StateFutureImpl<>(callbackRunner);
+StateFutureImpl ret = makeNewStateFuture();
 completableFuture.thenAccept(
 (t) -> {
 callbackRunner.submit(
 () -> {

(flink) 01/03: [FLINK-34986][Runtime/State] Introduce element-control component

2024-04-10 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 60c771b987af73382f8a41ccf7fecfdbe6dc8b1c
Author: fredia 
AuthorDate: Thu Mar 21 17:51:23 2024 +0800

[FLINK-34986][Runtime/State] Introduce element-control component
---
 .../apache/flink/core/state/StateFutureImpl.java   |   2 +-
 .../taskprocessing/AsyncExecutionController.java   |  80 ++
 .../runtime/taskprocessing/KeyAccountingUnit.java  |  82 ++
 .../runtime/taskprocessing/OrderPreserveMode.java  |  38 +++
 .../runtime/taskprocessing/ProcessingRequest.java  |  69 +
 .../runtime/taskprocessing/RecordContext.java  |  82 ++
 .../runtime/taskprocessing/ReferenceCounted.java   |  98 
 .../runtime/taskprocessing/StateExecutor.java  |  19 ++
 .../AsyncExecutionControllerTest.java  | 280 +
 .../taskprocessing/ReferenceCountedTest.java   |  71 ++
 .../taskprocessing/keyAccountingUnitTest.java  |  43 
 11 files changed, 863 insertions(+), 1 deletion(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java 
b/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java
index 3cdc873e789..dfae92c845f 100644
--- a/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java
+++ b/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java
@@ -46,7 +46,7 @@ public class StateFutureImpl implements 
InternalStateFuture {
 /** The callback runner. */
 CallbackRunner callbackRunner;
 
-StateFutureImpl(CallbackRunner callbackRunner) {
+public StateFutureImpl(CallbackRunner callbackRunner) {
 this.completableFuture = new CompletableFuture<>();
 this.callbackRunner = callbackRunner;
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/AsyncExecutionController.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/AsyncExecutionController.java
new file mode 100644
index 000..133e6dcd27a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/AsyncExecutionController.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskprocessing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+
+/**
+ * The Async Execution Controller (AEC) receives processing requests from 
operators, and put them
+ * into execution according to some strategies.
+ *
+ * It is responsible for:
+ * Preserving the sequence of elements bearing the same key by delaying 
subsequent requests
+ * until the processing of preceding ones is finalized.
+ * Tracking the in-flight data(records) and blocking the input if too much 
data in flight
+ * (back-pressure). It invokes {@link MailboxExecutor#yield()} to pause 
current operations,
+ * allowing for the execution of callbacks (mails in Mailbox).
+ *
+ * @param  the type of the record
+ * @param  the type of the key
+ */
+@Internal
+public class AsyncExecutionController {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(AsyncExecutionController.class);
+
+public static final int DEFAULT_MAX_IN_FLIGHT_RECORD_NUM = 6000;
+
+/** The max allow number of in-flight records. */
+private final int maxInFlightRecordNum;
+
+/** The key accounting unit which is used to detect the key conflict. */
+private final KeyAccountingUnit keyAccountingUnit;
+
+/** The mailbox executor, borrowed from {@code StreamTask}. */
+private final MailboxExecutor mailboxExecutor;
+
+/** The state executor where the {@link ProcessingRequest} is actually 
executed. */
+private final StateExecutor stateExecutor;
+
+public AsyncExecutionController(MailboxExecutor mailboxExecutor, 
StateExecutor stateExecutor) {
+this(mailboxExecutor, stateExecutor, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
+}
+
+publi

(flink) 03/03: [FLINK-34986][Runtime/State] More tests for reference counting of ContextStateFutureImpl

2024-04-10 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ec2a140daed15ef596a70f9fb645c55a23c84a2c
Author: Zakelly 
AuthorDate: Sun Apr 7 11:47:14 2024 +0800

[FLINK-34986][Runtime/State] More tests for reference counting of 
ContextStateFutureImpl
---
 .../apache/flink/core/state/StateFutureImpl.java   |   3 +-
 .../asyncprocessing/ContextStateFutureImpl.java|   2 +
 .../AsyncExecutionControllerTest.java  |   8 +-
 .../ContextStateFutureImplTest.java| 241 +
 4 files changed, 249 insertions(+), 5 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java 
b/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java
index 63de46cc91e..d7db0be5311 100644
--- a/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java
+++ b/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java
@@ -166,7 +166,8 @@ public class StateFutureImpl implements 
InternalStateFuture {
 }
 
 /**
- * Make a new future based on context of this future.
+ * Make a new future based on context of this future. Subclasses need to 
overload this method to
+ * generate their own instances (if needed).
  *
  * @return the new created future.
  */
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImpl.java
index f1f182b25fe..31e89676523 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImpl.java
@@ -32,6 +32,8 @@ import org.apache.flink.core.state.StateFutureImpl;
  * 2. -1 when future completed.
  * 3. +1 when callback registered.
  * 4. -1 when callback finished.
+ * Please refer to {@code ContextStateFutureImplTest} where the reference 
counting is carefully
+ * tested.
  */
 public class ContextStateFutureImpl extends StateFutureImpl {
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
index b3bae8ae6a0..41c2031414d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
@@ -170,7 +170,7 @@ class AsyncExecutionControllerTest {
  * An AsyncExecutionController for testing purpose, which integrates with 
basic buffer
  * mechanism.
  */
-class TestAsyncExecutionController extends 
AsyncExecutionController {
+static class TestAsyncExecutionController extends 
AsyncExecutionController {
 
 LinkedList> activeBuffer;
 
@@ -216,7 +216,7 @@ class AsyncExecutionControllerTest {
 }
 
 /** Simulate the underlying state that is actually used to execute the 
request. */
-class TestUnderlyingState {
+static class TestUnderlyingState {
 
 private final HashMap hashMap;
 
@@ -233,7 +233,7 @@ class AsyncExecutionControllerTest {
 }
 }
 
-class TestValueState implements ValueState {
+static class TestValueState implements ValueState {
 
 private final AsyncExecutionController 
asyncExecutionController;
 
@@ -266,7 +266,7 @@ class AsyncExecutionControllerTest {
  * A brief implementation of {@link StateExecutor}, to illustrate the 
interaction between AEC
  * and StateExecutor.
  */
-class TestStateExecutor implements StateExecutor {
+static class TestStateExecutor implements StateExecutor {
 
 public TestStateExecutor() {}
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImplTest.java
new file mode 100644
index 000..d0a169f6444
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImplTest.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distribu

(flink) branch master updated (54f789d21ef -> ec2a140daed)

2024-04-10 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 54f789d21ef [hotfix][doc] Generate and use docs of RpcOptions instead 
of AkkaOptions (#24629)
 new 60c771b987a [FLINK-34986][Runtime/State] Introduce element-control 
component
 new 29408487260 [FLINK-34986][Runtime/State] Basic implementation of AEC, 
RecordContext and reference counting
 new ec2a140daed [FLINK-34986][Runtime/State] More tests for reference 
counting of ContextStateFutureImpl

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/core/state/CompletedStateFuture.java |   5 +
 .../flink/core/state/InternalStateFuture.java  |   3 +
 .../apache/flink/core/state/StateFutureImpl.java   |  51 +++-
 .../asyncprocessing/AsyncExecutionController.java  | 170 
 .../asyncprocessing/ContextStateFutureImpl.java|  77 ++
 .../runtime/asyncprocessing/KeyAccountingUnit.java |  67 +
 .../runtime/asyncprocessing/RecordContext.java | 119 
 .../runtime/asyncprocessing/ReferenceCounted.java  | 103 +++
 .../runtime/asyncprocessing/StateExecutor.java |  21 +-
 .../asyncprocessing/StateFutureFactory.java|  51 
 .../runtime/asyncprocessing/StateRequest.java  |  88 ++
 .../runtime/asyncprocessing/StateRequestType.java  | 103 +++
 .../AsyncExecutionControllerTest.java  | 301 +
 .../ContextStateFutureImplTest.java| 241 +
 .../asyncprocessing/KeyAccountingUnitTest.java |  40 +++
 .../asyncprocessing/ReferenceCountedTest.java  |  72 +
 16 files changed, 1492 insertions(+), 20 deletions(-)
 create mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
 create mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImpl.java
 create mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/KeyAccountingUnit.java
 create mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/RecordContext.java
 create mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ReferenceCounted.java
 copy 
flink-core/src/main/java/org/apache/flink/core/state/InternalStateFuture.java 
=> 
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateExecutor.java
 (63%)
 create mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateFutureFactory.java
 create mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java
 create mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestType.java
 create mode 100644 
flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
 create mode 100644 
flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImplTest.java
 create mode 100644 
flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/KeyAccountingUnitTest.java
 create mode 100644 
flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/ReferenceCountedTest.java



(flink) branch master updated: [hotfix][doc] Generate and use docs of RpcOptions instead of AkkaOptions (#24629)

2024-04-10 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 54f789d21ef [hotfix][doc] Generate and use docs of RpcOptions instead 
of AkkaOptions (#24629)
54f789d21ef is described below

commit 54f789d21ef8616f5ea2aa495ffecb56c0430bc9
Author: Zakelly 
AuthorDate: Thu Apr 11 10:07:12 2024 +0800

[hotfix][doc] Generate and use docs of RpcOptions instead of AkkaOptions 
(#24629)
---
 docs/content.zh/docs/deployment/config.md   | 2 +-
 docs/content/docs/deployment/config.md  | 2 +-
 .../generated/{akka_configuration.html => rpc_configuration.html}   | 0
 3 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/docs/content.zh/docs/deployment/config.md 
b/docs/content.zh/docs/deployment/config.md
index 56d9206ab06..711d97aef40 100644
--- a/docs/content.zh/docs/deployment/config.md
+++ b/docs/content.zh/docs/deployment/config.md
@@ -562,7 +562,7 @@ These options are for the network stack that handles the 
streaming and batch dat
 Flink uses Pekko for RPC between components 
(JobManager/TaskManager/ResourceManager).
 Flink does not use Pekko for data transport.
 
-{{< generated/akka_configuration >}}
+{{< generated/rpc_configuration >}}
 
 
 
diff --git a/docs/content/docs/deployment/config.md 
b/docs/content/docs/deployment/config.md
index f7927e260b4..cc903eb34f2 100644
--- a/docs/content/docs/deployment/config.md
+++ b/docs/content/docs/deployment/config.md
@@ -564,7 +564,7 @@ These options are for the network stack that handles the 
streaming and batch dat
 Flink uses Pekko for RPC between components 
(JobManager/TaskManager/ResourceManager).
 Flink does not use Pekko for data transport.
 
-{{< generated/akka_configuration >}}
+{{< generated/rpc_configuration >}}
 
 
 
diff --git a/docs/layouts/shortcodes/generated/akka_configuration.html 
b/docs/layouts/shortcodes/generated/rpc_configuration.html
similarity index 100%
rename from docs/layouts/shortcodes/generated/akka_configuration.html
rename to docs/layouts/shortcodes/generated/rpc_configuration.html



(flink) branch master updated: [FLINK-28440][State/changelog] Increase the materialization Interval of ChangelogRecoveryITCase#testMaterialization

2024-04-09 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 10522ff67ec [FLINK-28440][State/changelog] Increase the 
materialization Interval of ChangelogRecoveryITCase#testMaterialization
10522ff67ec is described below

commit 10522ff67ec1b120374deb181ddaf4a4ee7bffb9
Author: fredia 
AuthorDate: Tue Apr 9 16:17:58 2024 +0800

[FLINK-28440][State/changelog] Increase the materialization Interval of 
ChangelogRecoveryITCase#testMaterialization
---
 .../org/apache/flink/test/checkpointing/ChangelogRecoveryITCase.java| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoveryITCase.java
index e554ab75f96..b5976633d4e 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoveryITCase.java
@@ -92,7 +92,7 @@ public class ChangelogRecoveryITCase extends 
ChangelogRecoveryITCaseBase {
 SharedReference> currentMaterializationId =
 sharedObjects.add(ConcurrentHashMap.newKeySet());
 StreamExecutionEnvironment env =
-getEnv(delegatedStateBackend, checkpointFolder, 100, 2, 50, 0);
+getEnv(delegatedStateBackend, checkpointFolder, 100, 2, 200, 
0);
 waitAndAssert(
 buildJobGraph(
 env,



(flink) branch master updated: [FLINK-34668][checkpoint] Report operator state handle of file merging directory to JM

2024-03-26 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 05b27be4112 [FLINK-34668][checkpoint] Report operator state handle of 
file merging directory to JM
05b27be4112 is described below

commit 05b27be4112e1f5ca4c5614b1714a202c13343b6
Author: fredia 
AuthorDate: Mon Mar 18 14:05:52 2024 +0800

[FLINK-34668][checkpoint] Report operator state handle of file merging 
directory to JM
---
 .../filemerging/FileMergingSnapshotManager.java|  12 ++
 .../FileMergingSnapshotManagerBase.java|  32 +
 ...efaultOperatorStateBackendSnapshotStrategy.java |  27 +++-
 .../filemerging/DirectoryStreamStateHandle.java|  90 
 .../EmptyFileMergingOperatorStreamStateHandle.java |  64 +
 .../filemerging/EmptySegmentFileStateHandle.java   |  46 ++
 .../FileMergingOperatorStreamStateHandle.java  | 154 +
 .../filemerging/SegmentFileStateHandle.java|   5 +-
 .../FileMergingCheckpointStateOutputStream.java|   2 +-
 .../FsMergingCheckpointStorageLocation.java|  11 ++
 ...ssCheckpointFileMergingSnapshotManagerTest.java |   1 +
 .../FileMergingSnapshotManagerTestBase.java|   1 +
 ...inCheckpointFileMergingSnapshotManagerTest.java |   1 +
 .../runtime/state/OperatorStateBackendTest.java|  85 
 .../runtime/state/SharedStateRegistryTest.java |  86 
 ...FileMergingCheckpointStateOutputStreamTest.java |   2 +-
 .../FsMergingCheckpointStorageLocationTest.java|   2 +-
 17 files changed, 614 insertions(+), 7 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
index cc54854a7a3..afc5eb618dc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
 import org.apache.flink.runtime.state.TaskStateManager;
+import org.apache.flink.runtime.state.filemerging.DirectoryStreamStateHandle;
 import 
org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream;
 import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageAccess;
 
@@ -117,6 +118,17 @@ public interface FileMergingSnapshotManager extends 
Closeable {
  */
 Path getManagedDir(SubtaskKey subtaskKey, CheckpointedStateScope scope);
 
+/**
+ * Get the {@link DirectoryStreamStateHandle} of the managed directory, 
created in {@link
+ * #initFileSystem} or {@link #registerSubtaskForSharedStates}.
+ *
+ * @param subtaskKey the subtask key identifying the subtask.
+ * @param scope the checkpoint scope.
+ * @return the {@link DirectoryStreamStateHandle} for one subtask in 
specified checkpoint scope.
+ */
+DirectoryStreamStateHandle getManagedDirStateHandle(
+SubtaskKey subtaskKey, CheckpointedStateScope scope);
+
 /**
  * Notifies the manager that the checkpoint with the given {@code 
checkpointId} completed and
  * was committed.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java
index 3dc9c788561..d46edcd235e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java
@@ -27,6 +27,8 @@ import org.apache.flink.core.fs.OutputStreamAndPath;
 import org.apache.flink.core.fs.Path;
 import 
org.apache.flink.runtime.checkpoint.filemerging.LogicalFile.LogicalFileId;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.runtime.state.filemerging.DirectoryStreamStateHandle;
+import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle;
 import 
org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
@@ -37,6 +39,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nonnull;
 import javax.annotation.concurrent.GuardedBy;
 
+import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.HashSet;
@@ -105,12 +108,24 @@ public abstract class

(flink) branch master updated (f7593524579 -> 501de487de9)

2024-03-19 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from f7593524579 [FLINK-34228] Add long UTF serializer/deserializer
 add 501de487de9 [FLINK-34667][state/changelog] Changelog state backend 
support local rescaling

No new revisions were added by this update.

Summary of changes:
 .../inmemory/InMemoryStateChangelogWriter.java  |  4 +++-
 .../state/changelog/ChangelogKeyedStateBackend.java | 21 -
 .../test/checkpointing/AutoRescalingITCase.java |  3 ---
 3 files changed, 19 insertions(+), 9 deletions(-)



(flink) 01/03: [FLINK-34484][state]Move LOCAL_RECOVERY config from CheckpointingOptions to StateRecoveryOption

2024-03-14 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1d1dca6d6b1a80c4fcd3337489a99c0d834692ea
Author: Jinzhong Li 
AuthorDate: Tue Feb 27 18:08:03 2024 +0800

[FLINK-34484][state]Move LOCAL_RECOVERY config from CheckpointingOptions to 
StateRecoveryOption
---
 .../generated/checkpointing_configuration.html  |  6 --
 .../generated/common_state_backends_section.html|  6 --
 .../generated/state_recovery_configuration.html |  6 ++
 .../flink/configuration/CheckpointingOptions.java   |  4 
 .../flink/configuration/StateRecoveryOptions.java   | 17 +
 .../flink/streaming/tests/AllroundMiniClusterTest.java  |  4 ++--
 .../DefaultSlotPoolServiceSchedulerFactory.java |  3 ++-
 .../scheduler/ExecutionSlotAllocationContext.java   |  3 +--
 .../taskexecutor/TaskManagerServicesConfiguration.java  |  3 ++-
 .../flink/runtime/util/SlotSelectionStrategyUtils.java  |  4 ++--
 .../DefaultSlotPoolServiceSchedulerFactoryTest.java |  4 ++--
 .../state/TaskExecutorLocalStateStoresManagerTest.java  |  3 ++-
 .../runtime/taskexecutor/TaskExecutorRecoveryTest.java  |  4 ++--
 .../runtime/util/SlotSelectionStrategyUtilsTest.java|  6 +++---
 .../flink/test/checkpointing/AutoRescalingITCase.java   |  3 ++-
 .../checkpointing/ChangelogLocalRecoveryITCase.java |  2 +-
 .../flink/test/checkpointing/LocalRecoveryITCase.java   |  4 ++--
 .../checkpointing/NotifyCheckpointAbortedITCase.java|  4 ++--
 .../checkpointing/ResumeCheckpointManuallyITCase.java   |  3 ++-
 .../apache/flink/test/recovery/LocalRecoveryITCase.java |  4 ++--
 .../runtime/DefaultSchedulerLocalRecoveryITCase.java|  4 ++--
 .../org/apache/flink/test/runtime/SchedulingITCase.java |  6 +++---
 .../test/state/ChangelogRecoveryCachingITCase.java  |  2 +-
 .../flink/test/state/ChangelogRescalingITCase.java  |  2 +-
 24 files changed, 63 insertions(+), 44 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/checkpointing_configuration.html 
b/docs/layouts/shortcodes/generated/checkpointing_configuration.html
index 2c40c7e1804..0cd3dd53755 100644
--- a/docs/layouts/shortcodes/generated/checkpointing_configuration.html
+++ b/docs/layouts/shortcodes/generated/checkpointing_configuration.html
@@ -14,12 +14,6 @@
 Boolean
 Option whether the state backend should create incremental 
checkpoints, if possible. For an incremental checkpoint, only a diff from the 
previous checkpoint is stored, rather than the complete checkpoint state. Once 
enabled, the state size shown in web UI or fetched from rest API only 
represents the delta checkpoint size instead of full checkpoint size. Some 
state backends may not support incremental checkpoints and ignore this 
option.
 
-
-state.backend.local-recovery
-false
-Boolean
-This option configures local recovery for this state backend. 
By default, local recovery is deactivated. Local recovery currently only covers 
keyed state backends (including both the EmbeddedRocksDBStateBackend and the 
HashMapStateBackend).
-
 
 state.checkpoint-storage
 (none)
diff --git 
a/docs/layouts/shortcodes/generated/common_state_backends_section.html 
b/docs/layouts/shortcodes/generated/common_state_backends_section.html
index 7d35697849c..a16650bf412 100644
--- a/docs/layouts/shortcodes/generated/common_state_backends_section.html
+++ b/docs/layouts/shortcodes/generated/common_state_backends_section.html
@@ -38,12 +38,6 @@
 Boolean
 Option whether the state backend should create incremental 
checkpoints, if possible. For an incremental checkpoint, only a diff from the 
previous checkpoint is stored, rather than the complete checkpoint state. Once 
enabled, the state size shown in web UI or fetched from rest API only 
represents the delta checkpoint size instead of full checkpoint size. Some 
state backends may not support incremental checkpoints and ignore this 
option.
 
-
-state.backend.local-recovery
-false
-Boolean
-This option configures local recovery for this state backend. 
By default, local recovery is deactivated. Local recovery currently only covers 
keyed state backends (including both the EmbeddedRocksDBStateBackend and the 
HashMapStateBackend).
-
 
 state.checkpoint.cleaner.parallel-mode
 true
diff --git 
a/docs/layouts/shortcodes/generated/state_recovery_configuration.html 
b/docs/layouts/shortcodes/generated/state_recovery_configuration.html
index 5c255b2f820..2a1d914e2fd 100644
--- a/docs/layouts/shortcodes/generated/state_recovery_configuration.html
+++ b/docs/layouts/shortcodes/generated/state_recovery_configuration.html
@@ -14,6 +14,12

(flink) 02/03: [FLINK-34484][state] Split 'state.backend.local-recovery' into two options for checkpointing and recovery

2024-03-14 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ed75795e97800177cb67141ab838632d5ec55bb5
Author: Jinzhong Li 
AuthorDate: Wed Feb 28 12:19:36 2024 +0800

[FLINK-34484][state] Split 'state.backend.local-recovery' into two options 
for checkpointing and recovery
---
 .../generated/checkpointing_configuration.html |  6 
 .../generated/common_state_backends_section.html   |  6 
 .../generated/state_recovery_configuration.html|  2 +-
 .../flink/configuration/CheckpointingOptions.java  | 24 +-
 .../flink/configuration/StateRecoveryOptions.java  | 16 +
 .../fs/DuplicatingStateChangeFsUploader.java   | 10 +++---
 .../changelog/fs/FsStateChangelogStorage.java  |  2 +-
 .../flink/changelog/fs/FsStateChangelogWriter.java |  2 +-
 .../changelog/fs/StateChangeUploadScheduler.java   |  2 +-
 .../api/runtime/SavepointTaskStateManager.java |  2 +-
 .../DefaultSlotPoolServiceSchedulerFactory.java|  1 -
 .../state/ChangelogTaskLocalStateStore.java|  2 +-
 .../state/CheckpointStreamWithResultProvider.java  |  2 +-
 .../flink/runtime/state/LocalRecoveryConfig.java   | 38 +++---
 ...er.java => LocalSnapshotDirectoryProvider.java} | 14 
 ...ava => LocalSnapshotDirectoryProviderImpl.java} | 10 +++---
 .../state/TaskExecutorLocalStateStoresManager.java | 21 +++-
 .../runtime/state/TaskLocalStateStoreImpl.java | 14 +++-
 .../runtime/state/heap/HeapSnapshotStrategy.java   |  2 +-
 .../runtime/taskexecutor/TaskManagerServices.java  |  1 +
 .../TaskManagerServicesConfiguration.java  | 14 ++--
 .../state/ChangelogTaskLocalStateStoreTest.java| 15 +
 .../CheckpointStreamWithResultProviderTest.java|  8 ++---
 ...=> LocalSnapshotDirectoryProviderImplTest.java} | 10 +++---
 .../TaskExecutorLocalStateStoresManagerTest.java   |  2 +-
 .../runtime/state/TaskLocalStateStoreImplTest.java |  7 ++--
 .../runtime/state/TaskStateManagerImplTest.java|  7 ++--
 .../runtime/state/TestLocalRecoveryConfig.java |  6 ++--
 .../state/changelog/ChangelogStateDiscardTest.java |  2 +-
 .../snapshot/RocksDBSnapshotStrategyBase.java  |  8 ++---
 .../runtime/tasks/LocalStateForwardingTest.java|  9 ++---
 .../runtime/tasks/StreamTaskTestHarness.java   |  6 ++--
 .../benchmark/StateBackendBenchmarkUtils.java  |  4 +--
 .../restore/StreamOperatorSnapshotRestoreTest.java | 13 +---
 34 files changed, 194 insertions(+), 94 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/checkpointing_configuration.html 
b/docs/layouts/shortcodes/generated/checkpointing_configuration.html
index 0cd3dd53755..c87a9c33803 100644
--- a/docs/layouts/shortcodes/generated/checkpointing_configuration.html
+++ b/docs/layouts/shortcodes/generated/checkpointing_configuration.html
@@ -8,6 +8,12 @@
 
 
 
+
+execution.checkpointing.local-backup.enabled
+false
+Boolean
+This option configures local backup for the state backend, 
which indicates whether to make backup checkpoint on local disk.  If not 
configured, fallback to execution.state-recovery.from-local. By default, local 
backup is deactivated. Local backup currently only covers keyed state backends 
(including both the EmbeddedRocksDBStateBackend and the 
HashMapStateBackend).
+
 
 state.backend.incremental
 false
diff --git 
a/docs/layouts/shortcodes/generated/common_state_backends_section.html 
b/docs/layouts/shortcodes/generated/common_state_backends_section.html
index a16650bf412..ab664f51bfb 100644
--- a/docs/layouts/shortcodes/generated/common_state_backends_section.html
+++ b/docs/layouts/shortcodes/generated/common_state_backends_section.html
@@ -32,6 +32,12 @@
 String
 The default directory for savepoints. Used by the state 
backends that write savepoints to file systems (HashMapStateBackend, 
EmbeddedRocksDBStateBackend).
 
+
+execution.state-recovery.from-local
+false
+Boolean
+This option configures local recovery for the state backend, 
which indicates whether to recovery from local snapshot.By default, local 
recovery is deactivated. Local recovery currently only covers keyed state 
backends (including both the EmbeddedRocksDBStateBackend and the 
HashMapStateBackend)."
+
 
 state.backend.incremental
 false
diff --git 
a/docs/layouts/shortcodes/generated/state_recovery_configuration.html 
b/docs/layouts/shortcodes/generated/state_recovery_configuration.html
index 2a1d914e2fd..2df071ea4d5 100644
--- a/docs/layouts/shortcodes/generated/state_recovery_configuration.html
+++ b/docs/layouts/shortcodes/generated/state_recovery_configuration.html
@@ -18,7 +18,7 @@

(flink) 03/03: [FLINK-34484][tests] Add UT/IT tests to verify localBackup and localRecovery configs

2024-03-14 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 649e2b412e10c7f698721cbbd8697f9b02248977
Author: Jinzhong Li 
AuthorDate: Wed Feb 28 18:44:30 2024 +0800

[FLINK-34484][tests] Add UT/IT tests to verify localBackup and 
localRecovery configs
---
 .../TaskExecutorLocalStateStoresManagerTest.java   | 31 +--
 .../runtime/state/TaskLocalStateStoreImplTest.java | 22 +++
 ...cutorExecutionDeploymentReconciliationTest.java |  1 +
 .../TaskExecutorPartitionLifecycleTest.java|  1 +
 .../taskexecutor/TaskExecutorSlotLifetimeTest.java |  1 +
 .../runtime/taskexecutor/TaskExecutorTest.java |  2 +
 .../TaskSubmissionTestEnvironment.java |  1 +
 .../test/checkpointing/LocalRecoveryITCase.java| 46 ++
 8 files changed, 94 insertions(+), 11 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
index 31c2f406f30..513375f3cb7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
@@ -152,7 +152,7 @@ class TaskExecutorLocalStateStoresManagerTest {
 }
 
 @Test
-void testLocalStateNoCreateDirWhenDisabledLocalRecovery() throws Exception 
{
+void testLocalStateNoCreateDirWhenDisabledLocalBackupAndRecovery() throws 
Exception {
 JobID jobID = new JobID();
 JobVertexID jobVertexID = new JobVertexID();
 AllocationID allocationID = new AllocationID();
@@ -165,9 +165,11 @@ class TaskExecutorLocalStateStoresManagerTest {
 };
 
 boolean localRecoveryEnabled = false;
+boolean localBackupEnabled = false;
 TaskExecutorLocalStateStoresManager storesManager =
 new TaskExecutorLocalStateStoresManager(
 localRecoveryEnabled,
+localBackupEnabled,
 Reference.owned(rootDirs),
 Executors.directExecutor());
 
@@ -196,6 +198,21 @@ class TaskExecutorLocalStateStoresManagerTest {
  */
 @Test
 void testSubtaskStateStoreDirectoryCreateAndDelete() throws Exception {
+testSubtaskStateStoreDirectoryCreateAndDelete(true, true);
+}
+
+@Test
+void testStateStoreDirectoryCreateAndDeleteWithLocalRecoveryEnabled() 
throws Exception {
+testSubtaskStateStoreDirectoryCreateAndDelete(true, false);
+}
+
+@Test
+void testStateStoreDirectoryCreateAndDeleteWithLocalBackupEnabled() throws 
Exception {
+testSubtaskStateStoreDirectoryCreateAndDelete(false, true);
+}
+
+private void testSubtaskStateStoreDirectoryCreateAndDelete(
+boolean localRecoveryEnabled, boolean localBackupEnabled) throws 
Exception {
 
 JobID jobID = new JobID();
 JobVertexID jobVertexID = new JobVertexID();
@@ -209,7 +226,10 @@ class TaskExecutorLocalStateStoresManagerTest {
 };
 TaskExecutorLocalStateStoresManager storesManager =
 new TaskExecutorLocalStateStoresManager(
-true, Reference.owned(rootDirs), 
Executors.directExecutor());
+localRecoveryEnabled,
+localBackupEnabled,
+Reference.owned(rootDirs),
+Executors.directExecutor());
 
 TaskLocalStateStore taskLocalStateStore =
 storesManager.localStateStoreForSubtask(
@@ -305,7 +325,10 @@ class TaskExecutorLocalStateStoresManagerTest {
 
 final TaskExecutorLocalStateStoresManager 
taskExecutorLocalStateStoresManager =
 new TaskExecutorLocalStateStoresManager(
-true, Reference.owned(localStateDirectories), 
Executors.directExecutor());
+true,
+true,
+Reference.owned(localStateDirectories),
+Executors.directExecutor());
 
 for (File localStateDirectory : localStateDirectories) {
 assertThat(localStateDirectory).exists();
@@ -327,6 +350,7 @@ class TaskExecutorLocalStateStoresManagerTest {
 
 final TaskExecutorLocalStateStoresManager 
taskExecutorLocalStateStoresManager =
 new TaskExecutorLocalStateStoresManager(
+true,
 true,
 Reference.borrowed(localStateDirectories),
 Executors.directExecutor());
@@ -348,6 +372,7 @@ class TaskExecutorLocalStateStoresManagerTest {
 final File localStateStore = 
TempDirUtils.newFolder(temporaryFolder.toPath());
 final

(flink) branch master updated (3b9623e5d2e -> 649e2b412e1)

2024-03-14 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 3b9623e5d2e [FLINK-32076][checkpoint] Introduce blocking file pool to 
reuse files (#24418)
 new 1d1dca6d6b1 [FLINK-34484][state]Move LOCAL_RECOVERY config from 
CheckpointingOptions to StateRecoveryOption
 new ed75795e978 [FLINK-34484][state] Split 'state.backend.local-recovery' 
into two options for checkpointing and recovery
 new 649e2b412e1 [FLINK-34484][tests] Add UT/IT tests to verify localBackup 
and localRecovery configs

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../generated/checkpointing_configuration.html |  8 ++--
 .../generated/common_state_backends_section.html   |  8 ++--
 .../generated/state_recovery_configuration.html|  6 +++
 .../flink/configuration/CheckpointingOptions.java  | 26 
 .../flink/configuration/StateRecoveryOptions.java  | 19 +
 .../fs/DuplicatingStateChangeFsUploader.java   | 10 ++---
 .../changelog/fs/FsStateChangelogStorage.java  |  2 +-
 .../flink/changelog/fs/FsStateChangelogWriter.java |  2 +-
 .../changelog/fs/StateChangeUploadScheduler.java   |  2 +-
 .../streaming/tests/AllroundMiniClusterTest.java   |  4 +-
 .../api/runtime/SavepointTaskStateManager.java |  2 +-
 .../DefaultSlotPoolServiceSchedulerFactory.java|  4 +-
 .../scheduler/ExecutionSlotAllocationContext.java  |  3 +-
 .../state/ChangelogTaskLocalStateStore.java|  2 +-
 .../state/CheckpointStreamWithResultProvider.java  |  2 +-
 .../flink/runtime/state/LocalRecoveryConfig.java   | 38 +++--
 ...er.java => LocalSnapshotDirectoryProvider.java} | 14 +++
 ...ava => LocalSnapshotDirectoryProviderImpl.java} | 10 ++---
 .../state/TaskExecutorLocalStateStoresManager.java | 21 ++
 .../runtime/state/TaskLocalStateStoreImpl.java | 14 ++-
 .../runtime/state/heap/HeapSnapshotStrategy.java   |  2 +-
 .../runtime/taskexecutor/TaskManagerServices.java  |  1 +
 .../TaskManagerServicesConfiguration.java  | 15 ++-
 .../runtime/util/SlotSelectionStrategyUtils.java   |  4 +-
 ...DefaultSlotPoolServiceSchedulerFactoryTest.java |  4 +-
 .../state/ChangelogTaskLocalStateStoreTest.java| 15 +++
 .../CheckpointStreamWithResultProviderTest.java|  8 ++--
 ...=> LocalSnapshotDirectoryProviderImplTest.java} | 10 ++---
 .../TaskExecutorLocalStateStoresManagerTest.java   | 36 +---
 .../runtime/state/TaskLocalStateStoreImplTest.java | 29 +++--
 .../runtime/state/TaskStateManagerImplTest.java|  7 ++--
 .../runtime/state/TestLocalRecoveryConfig.java |  6 +--
 ...cutorExecutionDeploymentReconciliationTest.java |  1 +
 .../TaskExecutorPartitionLifecycleTest.java|  1 +
 .../taskexecutor/TaskExecutorRecoveryTest.java |  4 +-
 .../taskexecutor/TaskExecutorSlotLifetimeTest.java |  1 +
 .../runtime/taskexecutor/TaskExecutorTest.java |  2 +
 .../TaskSubmissionTestEnvironment.java |  1 +
 .../util/SlotSelectionStrategyUtilsTest.java   |  6 +--
 .../state/changelog/ChangelogStateDiscardTest.java |  2 +-
 .../snapshot/RocksDBSnapshotStrategyBase.java  |  8 ++--
 .../runtime/tasks/LocalStateForwardingTest.java|  9 ++--
 .../runtime/tasks/StreamTaskTestHarness.java   |  6 +--
 .../benchmark/StateBackendBenchmarkUtils.java  |  4 +-
 .../test/checkpointing/AutoRescalingITCase.java|  3 +-
 .../ChangelogLocalRecoveryITCase.java  |  2 +-
 .../test/checkpointing/LocalRecoveryITCase.java| 48 ++
 .../NotifyCheckpointAbortedITCase.java |  4 +-
 .../ResumeCheckpointManuallyITCase.java|  3 +-
 .../flink/test/recovery/LocalRecoveryITCase.java   |  4 +-
 .../DefaultSchedulerLocalRecoveryITCase.java   |  4 +-
 .../flink/test/runtime/SchedulingITCase.java   |  6 +--
 .../test/state/ChangelogRecoveryCachingITCase.java |  2 +-
 .../flink/test/state/ChangelogRescalingITCase.java |  2 +-
 .../restore/StreamOperatorSnapshotRestoreTest.java | 13 +++---
 55 files changed, 336 insertions(+), 134 deletions(-)
 rename 
flink-runtime/src/main/java/org/apache/flink/runtime/state/{LocalRecoveryDirectoryProvider.java
 => LocalSnapshotDirectoryProvider.java} (83%)
 rename 
flink-runtime/src/main/java/org/apache/flink/runtime/state/{LocalRecoveryDirectoryProviderImpl.java
 => LocalSnapshotDirectoryProviderImpl.java} (93%)
 rename 
flink-runtime/src/test/java/org/apache/flink/runtime/state/{LocalRecoveryDirectoryProviderImplTest.java
 => LocalSnapshotDirectoryProviderImplTest.java} (93%)



(flink) branch master updated: [FLINK-34624][state/changelog] Enable local recovery in ChangelogRescalingITCase (#24470)

2024-03-11 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 94b55d1ae61 [FLINK-34624][state/changelog] Enable local recovery in 
ChangelogRescalingITCase (#24470)
94b55d1ae61 is described below

commit 94b55d1ae61257f21c7bb511660e7497f269abc7
Author: Yanfei Lei 
AuthorDate: Tue Mar 12 11:28:35 2024 +0800

[FLINK-34624][state/changelog] Enable local recovery in 
ChangelogRescalingITCase (#24470)
---
 .../streaming/util/TestStreamEnvironment.java  | 57 ++
 .../test/checkpointing/AutoRescalingITCase.java|  3 ++
 .../flink/test/state/ChangelogRescalingITCase.java |  2 +-
 3 files changed, 29 insertions(+), 33 deletions(-)

diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index e8f9f2578f6..58ddd379652 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -35,7 +35,6 @@ import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
 
-import static 
org.apache.flink.configuration.CheckpointingOptions.LOCAL_RECOVERY;
 import static 
org.apache.flink.runtime.testutils.PseudoRandomValueSelector.randomize;
 
 /** A {@link StreamExecutionEnvironment} that executes its jobs on {@link 
MiniCluster}. */
@@ -125,41 +124,35 @@ public class TestStreamEnvironment extends 
StreamExecutionEnvironment {
 }
 
 // randomize ITTests for enabling state change log
-if 
(isConfigurationSupportedByChangelog(miniCluster.getConfiguration())) {
-if 
(STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_ON)) {
-if 
(!conf.contains(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG)) {
-conf.set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, 
true);
-miniCluster.overrideRestoreModeForChangelogStateBackend();
-}
-} else if 
(STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_RAND)) {
-boolean enabled =
-randomize(conf, 
StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, true, false);
-if (enabled) {
-// More situations about enabling periodic materialization 
should be tested
-randomize(
-conf,
-
StateChangelogOptions.PERIODIC_MATERIALIZATION_ENABLED,
-true,
-true,
-true,
-false);
-randomize(
-conf,
-
StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL,
-Duration.ofMillis(100),
-Duration.ofMillis(500),
-Duration.ofSeconds(1),
-Duration.ofSeconds(5));
-miniCluster.overrideRestoreModeForChangelogStateBackend();
-}
+if 
(STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_ON)) {
+if (!conf.contains(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG)) 
{
+conf.set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, true);
+miniCluster.overrideRestoreModeForChangelogStateBackend();
+}
+} else if 
(STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_RAND)) {
+boolean enabled =
+randomize(conf, 
StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, true, false);
+if (enabled) {
+// More situations about enabling periodic materialization 
should be tested
+randomize(
+conf,
+StateChangelogOptions.PERIODIC_MATERIALIZATION_ENABLED,
+true,
+true,
+true,
+false);
+randomize(
+conf,
+
StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL,
+Duration.ofMillis(100),
+Duration.ofMillis(500),
+Duration.ofSeconds(1),
+Duration.ofSeconds(5));
+miniCluster.overrideRestoreModeForChangelogStateBackend();
 }
 }
 }
 
-private static boolean isConfigurationSupportedByChangelog(Configuration 
configuration

(flink) branch master updated: [FLINK-34483][docs] Improve the documentation of 'state.checkpoints.dir' and 'state.checkpoint-storage' (#24401)

2024-03-08 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 1d8cbe9065e [FLINK-34483][docs] Improve the documentation of 
'state.checkpoints.dir' and 'state.checkpoint-storage' (#24401)
1d8cbe9065e is described below

commit 1d8cbe9065e2ef34a748513b2469ecfe3626639e
Author: Yanfei Lei 
AuthorDate: Fri Mar 8 17:28:11 2024 +0800

[FLINK-34483][docs] Improve the documentation of 'state.checkpoints.dir' 
and 'state.checkpoint-storage' (#24401)
---
 .../generated/checkpointing_configuration.html |  4 ++--
 .../generated/common_state_backends_section.html   |  4 ++--
 .../flink/configuration/CheckpointingOptions.java  | 27 --
 3 files changed, 29 insertions(+), 6 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/checkpointing_configuration.html 
b/docs/layouts/shortcodes/generated/checkpointing_configuration.html
index 03979d7d6c9..2c40c7e1804 100644
--- a/docs/layouts/shortcodes/generated/checkpointing_configuration.html
+++ b/docs/layouts/shortcodes/generated/checkpointing_configuration.html
@@ -24,7 +24,7 @@
 state.checkpoint-storage
 (none)
 String
-The checkpoint storage implementation to be used to checkpoint 
state.The implementation can be specified either via their shortcut  
name, or via the class name of a CheckpointStorageFactory. If a factory is 
specified it is instantiated via its zero argument constructor and its CheckpointStorageFactory#createFromConfig(ReadableConfig,
 ClassLoader)  method is called.Recognized shortcut [...]
+The checkpoint storage implementation to be used to checkpoint 
state.The implementation can be specified either via their shortcut  
name, or via the class name of a CheckpointStorageFactory. If a factory is 
specified it is instantiated via its zero argument constructor and its CheckpointStorageFactory#createFromConfig(ReadableConfig,
 ClassLoader)  method is called.Recognized shortcut [...]
 
 
 state.checkpoint.cleaner.parallel-mode
@@ -42,7 +42,7 @@
 state.checkpoints.dir
 (none)
 String
-The default directory used for storing the data files and meta 
data of checkpoints in a Flink supported filesystem. The storage path must be 
accessible from all participating processes/nodes(i.e. all TaskManagers and 
JobManagers).
+The default directory used for storing the data files and meta 
data of checkpoints in a Flink supported filesystem. The storage path must be 
accessible from all participating processes/nodes(i.e. all TaskManagers and 
JobManagers). If the 'state.checkpoint-storage' is set to 'jobmanager', only 
the meta data of checkpoints will be stored in this directory.
 
 
 state.checkpoints.num-retained
diff --git 
a/docs/layouts/shortcodes/generated/common_state_backends_section.html 
b/docs/layouts/shortcodes/generated/common_state_backends_section.html
index 157cb75c81f..7d35697849c 100644
--- a/docs/layouts/shortcodes/generated/common_state_backends_section.html
+++ b/docs/layouts/shortcodes/generated/common_state_backends_section.html
@@ -18,13 +18,13 @@
 state.checkpoint-storage
 (none)
 String
-The checkpoint storage implementation to be used to checkpoint 
state.The implementation can be specified either via their shortcut  
name, or via the class name of a CheckpointStorageFactory. If a factory is 
specified it is instantiated via its zero argument constructor and its CheckpointStorageFactory#createFromConfig(ReadableConfig,
 ClassLoader)  method is called.Recognized shortcut [...]
+The checkpoint storage implementation to be used to checkpoint 
state.The implementation can be specified either via their shortcut  
name, or via the class name of a CheckpointStorageFactory. If a factory is 
specified it is instantiated via its zero argument constructor and its CheckpointStorageFactory#createFromConfig(ReadableConfig,
 ClassLoader)  method is called.Recognized shortcut [...]
 
 
 state.checkpoints.dir
 (none)
 String
-The default directory used for storing the data files and meta 
data of checkpoints in a Flink supported filesystem. The storage path must be 
accessible from all participating processes/nodes(i.e. all TaskManagers and 
JobManagers).
+The default directory used for storing the data files and meta 
data of checkpoints in a Flink supported filesystem. The storage path must be 
accessible from all participating processes/nodes(i.e. all TaskManagers and 
JobManagers). If the 'state.checkpoint-storage' is set to 'jobma

(flink) branch master updated (e7e973e212d -> 80090c76f88)

2024-02-22 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from e7e973e212d [FLINK-34458][checkpointing] Rename options for 
Generalized incremental checkpoints (changelog) (#24324)
 add 80090c76f88 [FLINK-34455] Move RestoreMode from flink-runtime to 
flink-core (#24320)

No new revisions were added by this update.

Summary of changes:
 .../src/main/java/org/apache/flink/client/cli/CliFrontendParser.java| 2 +-
 .../src/main/java/org/apache/flink/client/cli/ProgramOptions.java   | 2 +-
 .../src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java   | 2 +-
 .../container/entrypoint/StandaloneApplicationClusterEntryPoint.java| 2 +-
 .../src/main/java/org/apache/flink/core/execution}/RestoreMode.java | 2 +-
 .../highavailability/KubernetesCheckpointRecoveryFactory.java   | 2 +-
 .../main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java| 2 +-
 .../org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java | 2 +-
 .../org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java | 2 +-
 .../flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java   | 2 +-
 .../apache/flink/runtime/webmonitor/handlers/JarRunRequestBodyTest.java | 2 +-
 .../java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java  | 2 +-
 .../org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java  | 2 +-
 .../java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java   | 2 +-
 .../flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java  | 2 +-
 .../flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java   | 2 +-
 .../flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java   | 2 +-
 .../flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java| 2 +-
 .../flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java| 2 +-
 .../runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java| 2 +-
 .../java/org/apache/flink/runtime/jobgraph/SavepointConfigOptions.java  | 1 +
 .../org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java | 1 +
 .../src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java | 2 +-
 .../main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java| 2 +-
 .../main/java/org/apache/flink/runtime/state/SharedStateRegistry.java   | 2 +-
 .../java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java | 2 +-
 .../java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java| 2 +-
 .../src/main/java/org/apache/flink/runtime/state/StateBackend.java  | 2 +-
 .../src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java | 2 +-
 .../flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java  | 2 +-
 .../flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java| 2 +-
 .../org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java  | 2 +-
 .../flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java   | 2 +-
 .../org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java| 2 +-
 .../flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java   | 2 +-
 .../apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryTest.java   | 2 +-
 .../flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java  | 2 +-
 .../runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java | 2 +-
 .../flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java | 2 +-
 .../org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java| 2 +-
 .../dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java| 2 +-
 .../org/apache/flink/runtime/jobgraph/SavepointRestoreSettingsTest.java | 2 ++
 .../java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java | 2 +-
 .../java/org/apache/flink/runtime/state/SharedStateRegistryTest.java| 2 +-
 .../main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java  | 2 +-
 .../flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java  | 2 +-
 .../test/checkpointing/ChangelogRecoverySwitchStateBackendITCase.java   | 2 +-
 .../apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java | 2 +-
 .../java/org/apache/flink/test/checkpointing/SavepointFormatITCase.java | 2 +-
 .../test/java/org/apache/flink/test/checkpointing/SavepointITCase.java  | 2 +-
 50 files changed, 51 insertions(+), 47 deletions(-)
 rename {flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph => 
flink-core/src/main/java/org/apache/flink/core/execution}/RestoreMode.java (98%)



(flink) branch master updated (6f7e841dd4b -> 26817092445)

2024-02-21 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 6f7e841dd4b [FLINK-34479][documentation] Fix missed changelog configs 
in the documentation (#24354)
 add 26817092445 [FLINK-32443][docs-zh] Translate "State Processor API" 
page into Chinese (#23496)

No new revisions were added by this update.

Summary of changes:
 docs/content.zh/docs/libs/state_processor_api.md | 168 ++-
 1 file changed, 74 insertions(+), 94 deletions(-)



(flink) branch master updated: [FLINK-33863] Fix restoring compressed operator state (#23938)

2023-12-20 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new d415d93bbf9 [FLINK-33863] Fix restoring compressed operator state 
(#23938)
d415d93bbf9 is described below

commit d415d93bbf9620ba985136469107edd8c6e31cc6
Author: Ruibin Xing 
AuthorDate: Wed Dec 20 18:04:44 2023 +0800

[FLINK-33863] Fix restoring compressed operator state (#23938)
---
 .../state/OperatorStateRestoreOperation.java   |  30 +-
 .../state/OperatorStateRestoreOperationTest.java   | 114 +
 2 files changed, 142 insertions(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java
index f818eb81978..fd983fd5d28 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java
@@ -32,9 +32,12 @@ import org.apache.commons.io.IOUtils;
 import javax.annotation.Nonnull;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /** Implementation of operator state restore operation. */
 public class OperatorStateRestoreOperation implements RestoreOperation {
@@ -168,9 +171,32 @@ public class OperatorStateRestoreOperation implements 
RestoreOperation {
 }
 }
 
+List> 
entries =
+new 
ArrayList<>(stateHandle.getStateNameToPartitionOffsets().entrySet());
+
+if (backendSerializationProxy.isUsingStateCompression()) {
+// sort state handles by offsets to avoid building 
SnappyFramedInputStream with
+// EOF stream.
+entries =
+entries.stream()
+.sorted(
+Comparator.comparingLong(
+entry -> {
+
OperatorStateHandle.StateMetaInfo
+stateMetaInfo 
= entry.getValue();
+long[] offsets = 
stateMetaInfo.getOffsets();
+if (offsets == null
+|| 
offsets.length == 0) {
+return 
Long.MIN_VALUE;
+} else {
+return offsets[0];
+}
+}))
+.collect(Collectors.toList());
+}
+
 // Restore all the states
-for (Map.Entry 
nameToOffsets :
-
stateHandle.getStateNameToPartitionOffsets().entrySet()) {
+for (Map.Entry 
nameToOffsets : entries) {
 
 final String stateName = nameToOffsets.getKey();
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateRestoreOperationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateRestoreOperationTest.java
new file mode 100644
index 000..e0aecd5d723
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateRestoreOperationTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.comm

(flink-benchmarks) branch master updated: [FLINK-33482] Flink benchmark regression check in the machines hosted on Aliyun (#81)

2023-11-14 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git


The following commit(s) were added to refs/heads/master by this push:
 new a6fe13c  [FLINK-33482] Flink benchmark regression check in the 
machines hosted on Aliyun (#81)
a6fe13c is described below

commit a6fe13cc55499f4f26516aebbd8beb71e94cec88
Author: Zakelly 
AuthorDate: Wed Nov 15 12:37:50 2023 +0800

[FLINK-33482] Flink benchmark regression check in the machines hosted on 
Aliyun (#81)
---
 jenkinsfiles/regression-check.jenkinsfile |  4 ++--
 regression_report.py  | 36 +--
 regression_report_v2.py   | 18 ++--
 3 files changed, 47 insertions(+), 11 deletions(-)

diff --git a/jenkinsfiles/regression-check.jenkinsfile 
b/jenkinsfiles/regression-check.jenkinsfile
index b769a1f..1cbde9f 100644
--- a/jenkinsfiles/regression-check.jenkinsfile
+++ b/jenkinsfiles/regression-check.jenkinsfile
@@ -18,10 +18,10 @@
 timestamps {
 try {
 timeout(time: 3, unit: 'HOURS') { // includes waiting for a machine
-node('Hetzner') {
+node('Aliyun') {
 dir('flink-benchmarks') {
 git url: 'https://github.com/apache/flink-benchmarks.git', 
branch: 'master'
-sh './regression_report_v2.py > regression-report'
+sh 'python2 ./regression_report_v2.py > regression-report'
 def alerts = readFile "regression-report"
 if (alerts) {
  def attachments = [
diff --git a/regression_report.py b/regression_report.py
index fdbdcd7..1ddcedd 100755
--- a/regression_report.py
+++ b/regression_report.py
@@ -25,8 +25,9 @@ import argparse
 import json
 import re
 
-DEFAULT_CODESPEED_URL = 'http://codespeed.dak8s.net:8000/'
-ENVIRONMENT = 2
+DEFAULT_CODESPEED_URL = 'http://flink-speed.xyz/'
+ENVIRONMENT = 3
+ENVNAME='Aliyun'
 
 current_date = datetime.datetime.today()
 
@@ -69,6 +70,37 @@ def loadExecutableAndRevisions(codespeedUrl):
 revisions[exe] = rev
 return revisions
 
+"""
+Returns a dict executable id -> executable name
+"""
+def loadExecutableNames(codespeedUrl):
+names = {}
+url = codespeedUrl + 'reports'
+f = urllib2.urlopen(url)
+response = f.read()
+f.close()
+for line in response.split('\n'):
+# Find urls like: 
/changes/?rev=b8e7fc387dd-ffcdbb4-1647231150&exe=1&env=Hetzner
+# and extract rev and exe params out of it
+reports = dict(re.findall(r'([a-z]+)=([a-z0-9\-]+)', line))
+if "exe" in reports and "rev" in reports:
+exe = reports["exe"]
+name = re.findall('([A-Za-z0-9\-\ \(\)]+)\@' + ENVNAME + 
'\<\/td\>', line)
+# remember only the first (latest) revision for the given 
executable
+if exe not in names and len(name) > 0:
+names[exe] = name[0]
+return names
+
+"""
+Returns the Java version from the executable name
+"""
+def extractJavaVersion(name):
+result = re.findall('(\Java[0-9]+)', name)
+if len(result) > 0:
+return result[0]
+else:
+return "Java8"
+
 """
 Returns a dict executable -> benchmark names 
 """
diff --git a/regression_report_v2.py b/regression_report_v2.py
index f0389f4..9ce50f5 100755
--- a/regression_report_v2.py
+++ b/regression_report_v2.py
@@ -22,7 +22,11 @@ import json
 import urllib
 import urllib2
 
+from regression_report import DEFAULT_CODESPEED_URL
+from regression_report import ENVIRONMENT
 from regression_report import loadBenchmarkNames
+from regression_report import loadExecutableNames
+from regression_report import extractJavaVersion
 
 """
 The regression detection algorithm calculates the regression ratio as the 
ratio of change between the current
@@ -32,7 +36,6 @@ triggered if the regression ratio exceeds 
max(minRegressionRatio, minInstability
 Please refer to 
https://docs.google.com/document/d/1Bvzvq79Ll5yxd1UtC0YzczgFbZPAgPcN3cI0MjVkIag 
for more detail.
 """
 
-ENVIRONMENT = 2
 MIN_SAMPLE_SIZE_LIMIT = 5
 
 """
@@ -50,16 +53,16 @@ def loadHistoryData(codespeedUrl, exe, benchmark, 
baselineSize):
 return result, lessIsbBetter
 
 def detectRegression(urlToBenchmark, stds, scores, baselineSize, 
minRegressionRatio, minInstabilityMultiplier,
- direction):
+ direction, execName):
 
 sustainable_x = [min(scores[i - 3: i]) for i in range(3, min(len(scores), 
bas

[flink-benchmarks] branch master updated (3013320 -> 60e3522)

2023-10-08 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git


from 3013320  Bump Flink verion to 1.19-SNAPSHOT
 add 60e3522  [FLINK-33161] Java17 profile for benchmarking (#80)

No new revisions were added by this update.

Summary of changes:
 README.md |  6 +
 pom.xml   | 87 +++
 2 files changed, 93 insertions(+)



[flink] branch master updated: [FLINK-32072][checkpoint] Create and wire FileMergingSnapshotManager with TaskManagerServices (#22890)

2023-09-19 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 1112582fd13 [FLINK-32072][checkpoint] Create and wire 
FileMergingSnapshotManager with TaskManagerServices (#22890)
1112582fd13 is described below

commit 1112582fd136df47c6d356d6f6ad3946ad1e56d5
Author: Yanfei Lei <18653940+fre...@users.noreply.github.com>
AuthorDate: Tue Sep 19 17:31:04 2023 +0800

[FLINK-32072][checkpoint] Create and wire FileMergingSnapshotManager with 
TaskManagerServices (#22890)
---
 .../api/runtime/SavepointTaskStateManager.java |   7 +
 .../filemerging/FileMergingSnapshotManager.java|   6 +-
 .../FileMergingSnapshotManagerBuilder.java |   2 -
 .../runtime/state/CheckpointStorageWorkerView.java |  13 ++
 .../state/TaskExecutorFileMergingManager.java  | 143 +
 .../flink/runtime/state/TaskStateManager.java  |   3 +
 .../flink/runtime/state/TaskStateManagerImpl.java  |  14 ++
 .../filesystem/FsCheckpointStorageAccess.java  |  24 +++-
 .../FsMergingCheckpointStorageAccess.java  |  73 +++
 .../flink/runtime/taskexecutor/TaskExecutor.java   |  14 ++
 .../runtime/taskexecutor/TaskManagerServices.java  |  13 ++
 .../state/TaskExecutorFileMergingManagerTest.java  |  87 +
 .../runtime/state/TaskStateManagerImplTest.java|   4 +
 .../flink/runtime/state/TestTaskStateManager.java  |   7 +
 .../taskexecutor/TaskManagerServicesBuilder.java   |  10 ++
 .../runtime/util/JvmExitOnFatalErrorTest.java  |   1 +
 .../flink/streaming/runtime/tasks/StreamTask.java  |  29 -
 .../StateInitializationContextImplTest.java|   1 +
 .../StreamTaskStateInitializerImplTest.java|   1 +
 .../runtime/tasks/LocalStateForwardingTest.java|   1 +
 .../streaming/runtime/tasks/StreamTaskTest.java|   2 +
 21 files changed, 446 insertions(+), 9 deletions(-)

diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java
index cb6193e7724..a1d649d4d8c 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor;
 import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import 
org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReader;
+import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.TaskStateManager;
@@ -114,6 +115,12 @@ final class SavepointTaskStateManager implements 
TaskStateManager {
 return null;
 }
 
+@Nullable
+@Override
+public FileMergingSnapshotManager getFileMergingSnapshotManager() {
+return null;
+}
+
 @Override
 public void notifyCheckpointComplete(long checkpointId) {
 throw new UnsupportedOperationException(MSG);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
index 4954d8548f3..968e86f91c5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.runtime.checkpoint.filemerging;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
@@ -31,7 +32,7 @@ import java.io.Closeable;
  * FileMergingSnapshotManager provides an interface to manage files and meta 
information for
  * checkpoint files with merging checkpoint files enabled. It manages the 
files for ONE single task
  * in TM, including all subtasks of this single task that running in this TM. 
There is one
- * FileMergingSnapshotManager for each task per task manager.
+ * FileMergingSnapshotManager for each job per task manager.
  *
  * TODO (FLINK-32073): create output stream.
  *
@@ -125,7 +126,8 @@ public interface FileMergingSnapshotManager extends 
Closeable {
 taskInfo.getNumberOfParallelSu

[flink] branch master updated: [FLINK-32769][hotfix][state-changelog] Fix the invalid placeholder {} passed to MailboxExecutor#execute() in PeriodicMaterializationManager (#23153)

2023-09-15 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 13469792821 [FLINK-32769][hotfix][state-changelog] Fix the invalid 
placeholder {} passed to MailboxExecutor#execute() in 
PeriodicMaterializationManager (#23153)
13469792821 is described below

commit 13469792821ee1e9d2171a28a071cc986626316e
Author: Wang FeiFan 
AuthorDate: Fri Sep 15 16:50:58 2023 +0800

[FLINK-32769][hotfix][state-changelog] Fix the invalid placeholder {} 
passed to MailboxExecutor#execute() in PeriodicMaterializationManager (#23153)

Co-authored-by: Yanfei Lei <18653940+fre...@users.noreply.github.com>
---
 .../org/apache/flink/state/common/PeriodicMaterializationManager.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/PeriodicMaterializationManager.java
 
b/flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/PeriodicMaterializationManager.java
index 68bee356208..7989ea31cfb 100644
--- 
a/flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/PeriodicMaterializationManager.java
+++ 
b/flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/PeriodicMaterializationManager.java
@@ -267,7 +267,7 @@ public class PeriodicMaterializationManager implements 
Closeable {
 
metrics.reportFailedMaterialization();
 }
 },
-"Task {} update materializedSnapshot 
up to changelog sequence number: {}",
+"Task %s update materializedSnapshot 
up to changelog sequence number: %s.",
 subtaskName,
 upTo);
 
@@ -310,7 +310,7 @@ public class PeriodicMaterializationManager implements 
Closeable {
 () ->
 target.handleMaterializationFailureOrCancellation(
 materializationId, upTo, cause),
-"Task {} materialization:{},upTo:{} failed or canceled.",
+"Task %s materialization: %d, upTo: %s, failed or canceled.",
 subtaskName,
 materializationId,
 upTo);



[flink] branch master updated: [FLINK-30863][state/changelog] Register local recovery files of changelog before notifyCheckpointComplete()

2023-08-29 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new c23a3002b83 [FLINK-30863][state/changelog] Register local recovery 
files of changelog before notifyCheckpointComplete()
c23a3002b83 is described below

commit c23a3002b837ecffa2d561a405c17c979c814c6e
Author: Yanfei Lei <18653940+fre...@users.noreply.github.com>
AuthorDate: Tue Aug 29 16:37:16 2023 +0800

[FLINK-30863][state/changelog] Register local recovery files of changelog 
before notifyCheckpointComplete()

This closes https://github.com/apache/flink/pull/21822
---
 .../fs/DuplicatingStateChangeFsUploader.java   |  12 +-
 .../changelog/fs/FsStateChangelogStorage.java  |   2 +
 .../flink/changelog/fs/FsStateChangelogWriter.java |  48 ++--
 .../changelog/fs/ChangelogStorageMetricsTest.java  |  16 +-
 .../fs/DiscardRecordableStateChangeUploader.java   |   3 +-
 .../fs/FsStateChangelogWriterSqnTest.java  |   2 +-
 .../changelog/fs/FsStateChangelogWriterTest.java   | 263 +++--
 .../state/changelog/LocalChangelogRegistry.java|  15 +-
 .../changelog/LocalChangelogRegistryImpl.java  |  26 +-
 .../state/changelog/StateChangelogWriter.java  |  11 +-
 .../inmemory/InMemoryStateChangelogWriter.java |   5 +-
 .../runtime/state/TestLocalRecoveryConfig.java |   4 +
 .../changelog/LocalChangelogRegistryTest.java  |   2 +-
 .../inmemory/StateChangelogStorageTest.java|   2 +-
 .../changelog/ChangelogKeyedStateBackend.java  |   2 +-
 .../state/changelog/ChangelogTruncateHelper.java   |   1 -
 .../state/changelog/StateChangeLoggerTestBase.java |   6 +-
 17 files changed, 319 insertions(+), 101 deletions(-)

diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.java
index cf99ed17683..50db749b973 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.java
@@ -26,6 +26,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.ChangelogTaskLocalStateStore;
 import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider;
 import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.changelog.LocalChangelogRegistry;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 
 import org.slf4j.Logger;
@@ -51,14 +52,13 @@ import static 
org.apache.flink.runtime.state.ChangelogTaskLocalStateStore.getLoc
  *   Store the meta of files into {@link ChangelogTaskLocalStateStore} 
by
  *   AsyncCheckpointRunnable#reportCompletedSnapshotStates().
  *   Pass control of the file to {@link 
LocalChangelogRegistry#register} when
- *   ChangelogKeyedStateBackend#notifyCheckpointComplete() , files of 
the previous
- *   checkpoint will be deleted by {@link 
LocalChangelogRegistry#discardUpToCheckpoint} at
- *   the same time.
+ *   FsStateChangelogWriter#persist , files of the previous checkpoint 
will be deleted by
+ *   {@link LocalChangelogRegistry#discardUpToCheckpoint} when the 
checkpoint is confirmed.
  *   When ChangelogTruncateHelper#materialized() or
  *   ChangelogTruncateHelper#checkpointSubsumed() is called, {@link
- *   TaskChangelogRegistry#notUsed} is responsible for deleting local 
files.
- *   When one checkpoint is aborted, the dstl files of this checkpoint 
will be deleted by
- *   {@link LocalChangelogRegistry#prune} in {@link 
FsStateChangelogWriter#reset}.
+ *   TaskChangelogRegistry#release} is responsible for deleting local 
files.
+ *   When one checkpoint is aborted, all accumulated local dstl files 
will be deleted at
+ *   once.
  * 
  */
 public class DuplicatingStateChangeFsUploader extends 
AbstractStateChangeFsUploader {
diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java
index 5ba18f701ea..1d4668cb8e8 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.runtime.state.changelog.LocalChangelogRegistry;
 import org.apache.flink.runtime.state.changelog.LocalChangelogRegistryImpl;
 import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
 
+