[jira] [Created] (FLINK-34119) Improve description about changelog in document

2024-01-16 Thread Hangxiang Yu (Jira)
Hangxiang Yu created FLINK-34119:


 Summary: Improve description about changelog in document
 Key: FLINK-34119
 URL: https://issues.apache.org/jira/browse/FLINK-34119
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Reporter: Hangxiang Yu
Assignee: Hangxiang Yu


Since we have resolved some issues and marked as prodution-ready in [release 
note,|#generalized-incremental-checkpoint]]

we could update some description about it in doc.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-33679) RestoreMode uses NO_CLAIM as default instead of LEGACY

2024-01-16 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu resolved FLINK-33679.
--
Fix Version/s: 1.19.0
 Assignee: Zakelly Lan
   Resolution: Fixed

merged fb7324b0 into master

> RestoreMode uses NO_CLAIM as default instead of LEGACY
> --
>
> Key: FLINK-33679
> URL: https://issues.apache.org/jira/browse/FLINK-33679
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Runtime / State Backends
>Reporter: junzhong qin
>Assignee: Zakelly Lan
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> RestoreMode uses NO_CLAIM as default instead of LEGACY.
> {code:java}
> public enum RestoreMode implements DescribedEnum {
> CLAIM(
> "Flink will take ownership of the given snapshot. It will clean 
> the"
> + " snapshot once it is subsumed by newer ones."),
> NO_CLAIM(
> "Flink will not claim ownership of the snapshot files. However it 
> will make sure it"
> + " does not depend on any artefacts from the restored 
> snapshot. In order to do that,"
> + " Flink will take the first checkpoint as a full one, 
> which means it might"
> + " reupload/duplicate files that are part of the 
> restored checkpoint."),
> LEGACY(
> "This is the mode in which Flink worked so far. It will not claim 
> ownership of the"
> + " snapshot and will not delete the files. However, it 
> can directly depend on"
> + " the existence of the files of the restored 
> checkpoint. It might not be safe"
> + " to delete checkpoints that were restored in legacy 
> mode ");
> private final String description;
> RestoreMode(String description) {
> this.description = description;
> }
> @Override
> @Internal
> public InlineElement getDescription() {
> return text(description);
> }
> public static final RestoreMode DEFAULT = NO_CLAIM;
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-29802) ChangelogStateBackend supports native savepoint

2024-01-15 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29802?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu reassigned FLINK-29802:


Assignee: Hangxiang Yu

> ChangelogStateBackend supports native savepoint
> ---
>
> Key: FLINK-29802
> URL: https://issues.apache.org/jira/browse/FLINK-29802
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Hangxiang Yu
>Assignee: Hangxiang Yu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-34078) Move InternalKeyContext classes from o.a.f.runtime.state.heap to o.a.f.runtime.state package

2024-01-15 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34078?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu resolved FLINK-34078.
--
Fix Version/s: 1.19.0
   Resolution: Fixed

merged 6bdb4f75 into master

> Move InternalKeyContext classes from o.a.f.runtime.state.heap to 
> o.a.f.runtime.state package
> 
>
> Key: FLINK-34078
> URL: https://issues.apache.org/jira/browse/FLINK-34078
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Jinzhong Li
>Assignee: Jinzhong Li
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.19.0
>
> Attachments: image-2024-01-15-12-57-12-667.png
>
>
> h3. Motication:
> When Rocksdb statebackend throws a keyGroup check illegal exception, 
> the exception stack contains the heap stateBackend scoped class, which looks 
> so strange to user.
> !image-2024-01-15-12-57-12-667.png|width=555,height=68!
> h3. Proposed changes:
> InternalKeyContext and InternalKeyContextImpl are commonly used by all state 
> backends (heap/rocksdb/changelog), they should be moved from 
> org.apache.flink.runtime.state.heap package to org.apache.flink.runtime.state 
> package.
> h3. Compatibility:
> InternalKeyContext is annotated with @Internal, so this change has no 
> compatibility issues.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-34078) Move InternalKeyContext classes from o.a.f.runtime.state.heap to o.a.f.runtime.state package

2024-01-14 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34078?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu reassigned FLINK-34078:


Assignee: Jinzhong Li

> Move InternalKeyContext classes from o.a.f.runtime.state.heap to 
> o.a.f.runtime.state package
> 
>
> Key: FLINK-34078
> URL: https://issues.apache.org/jira/browse/FLINK-34078
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Jinzhong Li
>Assignee: Jinzhong Li
>Priority: Minor
> Attachments: image-2024-01-15-12-57-12-667.png
>
>
> h3. Motication:
> When Rocksdb statebackend throws a keyGroup check illegal exception, 
> the exception stack contains the heap stateBackend scoped class, which looks 
> so strange to user.
> !image-2024-01-15-12-57-12-667.png|width=555,height=68!
> h3. Proposed changes:
> InternalKeyContext and InternalKeyContextImpl are commonly used by all state 
> backends (heap/rocksdb/changelog), they should be moved from 
> org.apache.flink.runtime.state.heap package to org.apache.flink.runtime.state 
> package.
> h3. Compatibility:
> InternalKeyContext is annotated with @Internal, so this change has no 
> compatibility issues.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34078) Move InternalKeyContext classes from o.a.f.runtime.state.heap to o.a.f.runtime.state package

2024-01-14 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806660#comment-17806660
 ] 

Hangxiang Yu commented on FLINK-34078:
--

Thanks for reporting this.

It makes sense to move it to the outer package.

Already assigned to you, please go ahead.

> Move InternalKeyContext classes from o.a.f.runtime.state.heap to 
> o.a.f.runtime.state package
> 
>
> Key: FLINK-34078
> URL: https://issues.apache.org/jira/browse/FLINK-34078
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Jinzhong Li
>Priority: Minor
> Attachments: image-2024-01-15-12-57-12-667.png
>
>
> h3. Motication:
> When Rocksdb statebackend throws a keyGroup check illegal exception, 
> the exception stack contains the heap stateBackend scoped class, which looks 
> so strange to user.
> !image-2024-01-15-12-57-12-667.png|width=555,height=68!
> h3. Proposed changes:
> InternalKeyContext and InternalKeyContextImpl are commonly used by all state 
> backends (heap/rocksdb/changelog), they should be moved from 
> org.apache.flink.runtime.state.heap package to org.apache.flink.runtime.state 
> package.
> h3. Compatibility:
> InternalKeyContext is annotated with @Internal, so this change has no 
> compatibility issues.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-34050) Rocksdb state has space amplification after rescaling with DeleteRange

2024-01-14 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu reassigned FLINK-34050:


Assignee: Jinzhong Li

> Rocksdb state has space amplification after rescaling with DeleteRange
> --
>
> Key: FLINK-34050
> URL: https://issues.apache.org/jira/browse/FLINK-34050
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Jinzhong Li
>Assignee: Jinzhong Li
>Priority: Major
> Attachments: image-2024-01-10-21-23-48-134.png, 
> image-2024-01-10-21-24-10-983.png, image-2024-01-10-21-28-24-312.png
>
>
> FLINK-21321 use deleteRange to speed up rocksdb rescaling, however it will 
> cause space amplification in some case.
> We can reproduce this problem using wordCount job:
> 1) before rescaling, state operator in wordCount job has 2 parallelism and 
> 4G+ full checkpoint size;
> !image-2024-01-10-21-24-10-983.png|width=266,height=130!
> 2) then restart job with 4 parallelism (for state operator),  the full 
> checkpoint size of new job will be 8G+ ;
> 3) after many successful checkpoints, the full checkpoint size is still 8G+;
> !image-2024-01-10-21-28-24-312.png|width=454,height=111!
>  
> The root cause of this issue is that the deleted keyGroupRange does not 
> overlap with current DB keyGroupRange, so new data written into rocksdb after 
> rescaling almost never do LSM compaction with the deleted data (belonging to 
> other keyGroupRange.)
>  
> And the space amplification may affect Rocksdb read performance and disk 
> space usage after rescaling. It looks like a regression due to the 
> introduction of deleteRange for rescaling optimization.
>  
> To slove this problem, I think maybe we can invoke 
> Rocksdb.deleteFilesInRanges after deleteRange?
> {code:java}
> public static void clipDBWithKeyGroupRange() {
>   //...
>   List ranges = new ArrayList<>();
>   //...
>   deleteRange(db, columnFamilyHandles, beginKeyGroupBytes, endKeyGroupBytes);
>   ranges.add(beginKeyGroupBytes);
>   ranges.add(endKeyGroupBytes);
>   //
>   for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
>  db.deleteFilesInRanges(columnFamilyHandle, ranges, false);
>   }
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34050) Rocksdb state has space amplification after rescaling with DeleteRange

2024-01-14 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806659#comment-17806659
 ] 

Hangxiang Yu commented on FLINK-34050:
--

[~lijinzhong] Yeah, I think this benchmark result should be enough.

> Rocksdb state has space amplification after rescaling with DeleteRange
> --
>
> Key: FLINK-34050
> URL: https://issues.apache.org/jira/browse/FLINK-34050
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Jinzhong Li
>Priority: Major
> Attachments: image-2024-01-10-21-23-48-134.png, 
> image-2024-01-10-21-24-10-983.png, image-2024-01-10-21-28-24-312.png
>
>
> FLINK-21321 use deleteRange to speed up rocksdb rescaling, however it will 
> cause space amplification in some case.
> We can reproduce this problem using wordCount job:
> 1) before rescaling, state operator in wordCount job has 2 parallelism and 
> 4G+ full checkpoint size;
> !image-2024-01-10-21-24-10-983.png|width=266,height=130!
> 2) then restart job with 4 parallelism (for state operator),  the full 
> checkpoint size of new job will be 8G+ ;
> 3) after many successful checkpoints, the full checkpoint size is still 8G+;
> !image-2024-01-10-21-28-24-312.png|width=454,height=111!
>  
> The root cause of this issue is that the deleted keyGroupRange does not 
> overlap with current DB keyGroupRange, so new data written into rocksdb after 
> rescaling almost never do LSM compaction with the deleted data (belonging to 
> other keyGroupRange.)
>  
> And the space amplification may affect Rocksdb read performance and disk 
> space usage after rescaling. It looks like a regression due to the 
> introduction of deleteRange for rescaling optimization.
>  
> To slove this problem, I think maybe we can invoke 
> Rocksdb.deleteFilesInRanges after deleteRange?
> {code:java}
> public static void clipDBWithKeyGroupRange() {
>   //...
>   List ranges = new ArrayList<>();
>   //...
>   deleteRange(db, columnFamilyHandles, beginKeyGroupBytes, endKeyGroupBytes);
>   ranges.add(beginKeyGroupBytes);
>   ranges.add(endKeyGroupBytes);
>   //
>   for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
>  db.deleteFilesInRanges(columnFamilyHandle, ranges, false);
>   }
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-33057) Add options to disable creating job-id subdirectories under the checkpoint directory

2024-01-14 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33057?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu resolved FLINK-33057.
--
Fix Version/s: 1.19.0
   Resolution: Fixed

merged 73b036c3 into master

> Add options to disable creating job-id subdirectories under the checkpoint 
> directory
> 
>
> Key: FLINK-33057
> URL: https://issues.apache.org/jira/browse/FLINK-33057
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.19.0
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> By default, Flink creates subdirectories named by UUID (job id) under 
> checkpoint directory for each job. It's a good means to avoid collision. 
> However, it also bring in some effort to remember/find the right directory 
> when recovering from previous checkpoint. According to previous discussion 
> ([Yun 
> Tang's|https://issues.apache.org/jira/browse/FLINK-11789?focusedCommentId=16782314=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16782314]
>  and [Stephan 
> Ewen's|https://issues.apache.org/jira/browse/FLINK-9043?focusedCommentId=16409254=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16409254]
>  ), I think it would be useful to add an option to disable creating the UUID 
> subdirectories under the checkpoint directory. For compatibility 
> considerations, we create the subdirectories by default.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-30613) Improve resolving schema compatibility -- Milestone one

2024-01-14 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu resolved FLINK-30613.
--
Resolution: Fixed

merged 13921a08...0e5de813 into master

> Improve resolving schema compatibility -- Milestone one
> ---
>
> Key: FLINK-30613
> URL: https://issues.apache.org/jira/browse/FLINK-30613
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Type Serialization System
>Reporter: Hangxiang Yu
>Assignee: Hangxiang Yu
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.19.0
>
>
> In the milestone one, we should:
>  # Add an extra method 
> (TypeserializeSnapshotr#resolveSchemaCompatibility(TypeSerializerSnapshot 
> oldSerializerSnapshot)) in TypeSerializerSnapshot.java as above, and return 
> INCOMPATIBLE as default.
>  # Mark the original method as deprecated and it will use new method to 
> resolve as default.
>  # Implement the new method for all built-in TypeserializerSnapshots.
> See FLIP-263 for more details.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34009) Apache flink: Checkpoint restoration issue on Application Mode of deployment

2024-01-12 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805937#comment-17805937
 ] 

Hangxiang Yu commented on FLINK-34009:
--

[~vrang...@in.ibm.com]

Hi, You could reorganize the info and  ask for help in User mails where more 
people could help with this.

Let's report in Jira after guaranting it's an issue.

> Apache flink: Checkpoint restoration issue on Application Mode of deployment
> 
>
> Key: FLINK-34009
> URL: https://issues.apache.org/jira/browse/FLINK-34009
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.18.0
> Environment: Flink version: 1.18
> Zookeeper version: 3.7.2
> Env: Custom flink docker image (with embedded application class) deployed 
> over kubernetes (v1.26.11).
>Reporter: Vijay
>Priority: Major
>
> Hi Team,
> Good Day. Wish you all a happy new year 2024.
> We are using Flink (1.18) version on our flink cluster. Job manager has been 
> deployed on "Application mode" and HA is disabled (high-availability.type: 
> NONE), under this configuration parameters we are able to start multiple jobs 
> (using env.executeAsync()) of a single application.
> Note: We have also setup checkpoint on a s3 instance with 
> RETAIN_ON_CANCELLATION mode (plus other required settings).
> Lets say now we start two jobs of the same application (ex: Jobidxxx1, 
> jobidxxx2) and they are currently running on the k8s env. If we have to 
> perform Flink minor upgrade (or) upgrade of our application with minor 
> changes, in that case we will stop the Job Manager and Task Managers 
> instances and perform the necessary up-gradation then when we start both Job 
> Manager and Task Managers instance. On startup we expect the job's to be 
> restored back from the last checkpoint, but the job restoration is not 
> happening on Job manager startup. Please let us know if this is an bug (or) 
> its the general behavior of flink under application mode of deployment.
> Additional information: If we enable HA (using Zookeeper) on Application 
> mode, we are able to startup only one job (i.e., per-job behavior). When we 
> perform Flink minor upgrade (or) upgrade of our application with minor 
> changes, the checkpoint restoration is working properly on Job Manager & Task 
> Managers restart process.
> It seems checkpoint restoration and HA are inter-related, but why checkpoint 
> restoration doesn't work when HA is disabled.
>  
> Please let us know if anyone has experienced similar issues or if have any 
> suggestions, it will be highly appreciated. Thanks in advance for your 
> assistance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34050) Rocksdb state has space amplification after rescaling with DeleteRange

2024-01-11 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805915#comment-17805915
 ] 

Hangxiang Yu commented on FLINK-34050:
--

Thanks [~lijinzhong] and [~mayuehappy] providing this information.

IMO, maybe deleteRange+deleteFilesInRanges could be a good default behavious:
 # Only delteRange may cause some extra space usage which even last forever.
 # adding deleteFilesInRanges should not cost too much time than before because 
checking then deleting files should be quick.
 # Remaining files should still contain ranges which current keys and 
compaction will finally access. 

Of course, we could provide some performace check about 
deleteRange+deleteFilesInRanges vs deleteRange.

> Rocksdb state has space amplification after rescaling with DeleteRange
> --
>
> Key: FLINK-34050
> URL: https://issues.apache.org/jira/browse/FLINK-34050
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Jinzhong Li
>Priority: Major
> Attachments: image-2024-01-10-21-23-48-134.png, 
> image-2024-01-10-21-24-10-983.png, image-2024-01-10-21-28-24-312.png
>
>
> FLINK-21321 use deleteRange to speed up rocksdb rescaling, however it will 
> cause space amplification in some case.
> We can reproduce this problem using wordCount job:
> 1) before rescaling, state operator in wordCount job has 2 parallelism and 
> 4G+ full checkpoint size;
> !image-2024-01-10-21-24-10-983.png|width=266,height=130!
> 2) then restart job with 4 parallelism (for state operator),  the full 
> checkpoint size of new job will be 8G+ ;
> 3) after many successful checkpoints, the full checkpoint size is still 8G+;
> !image-2024-01-10-21-28-24-312.png|width=454,height=111!
>  
> The root cause of this issue is that the deleted keyGroupRange does not 
> overlap with current DB keyGroupRange, so new data written into rocksdb after 
> rescaling almost never do LSM compaction with the deleted data (belonging to 
> other keyGroupRange.)
>  
> And the space amplification may affect Rocksdb read performance and disk 
> space usage after rescaling. It looks like a regression due to the 
> introduction of deleteRange for rescaling optimization.
>  
> To slove this problem, I think maybe we can invoke 
> Rocksdb.deleteFilesInRanges after deleteRange?
> {code:java}
> public static void clipDBWithKeyGroupRange() {
>   //...
>   List ranges = new ArrayList<>();
>   //...
>   deleteRange(db, columnFamilyHandles, beginKeyGroupBytes, endKeyGroupBytes);
>   ranges.add(beginKeyGroupBytes);
>   ranges.add(endKeyGroupBytes);
>   //
>   for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
>  db.deleteFilesInRanges(columnFamilyHandle, ranges, false);
>   }
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-33946) RocksDb sets setAvoidFlushDuringShutdown to true to speed up Task Cancel

2024-01-11 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu resolved FLINK-33946.
--
Resolution: Fixed

merged aa0c1b5e into master

> RocksDb sets setAvoidFlushDuringShutdown to true to speed up Task Cancel
> 
>
> Key: FLINK-33946
> URL: https://issues.apache.org/jira/browse/FLINK-33946
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.19.0
>Reporter: Yue Ma
>Assignee: Yue Ma
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> When a Job fails, the task needs to be canceled and re-deployed. 
> RocksDBStatebackend will call RocksDB.close when disposing.
> {code:java}
> if (!shutting_down_.load(std::memory_order_acquire) &&
> has_unpersisted_data_.load(std::memory_order_relaxed) &&
> !mutable_db_options_.avoid_flush_during_shutdown) {
>   if (immutable_db_options_.atomic_flush) {
> autovector cfds;
> SelectColumnFamiliesForAtomicFlush();
> mutex_.Unlock();
> Status s =
> AtomicFlushMemTables(cfds, FlushOptions(), FlushReason::kShutDown);
> s.PermitUncheckedError();  //**TODO: What to do on error?
> mutex_.Lock();
>   } else {
> for (auto cfd : *versions_->GetColumnFamilySet()) {
>   if (!cfd->IsDropped() && cfd->initialized() && !cfd->mem()->IsEmpty()) {
> cfd->Ref();
> mutex_.Unlock();
> Status s = FlushMemTable(cfd, FlushOptions(), FlushReason::kShutDown);
> s.PermitUncheckedError();  //**TODO: What to do on error?
> mutex_.Lock();
> cfd->UnrefAndTryDelete();
>   }
> }
>   } {code}
> By default (avoid_flush_during_shutdown=false) RocksDb requires FlushMemtable 
> when Close. When the disk pressure is high or the Memtable is large, this 
> process will be more time-consuming, which will cause the Task to get stuck 
> in the Canceling stage and affect the speed of job Failover.
> In fact, it is completely unnecessary to Flush memtable when Flink Task is 
> Close, because the data can be replayed from Checkpoint. So we can set 
> avoid_flush_during_shutdown to true to speed up Task Failover



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-34051) Fix equals/hashCode/toString for SavepointRestoreSettings

2024-01-11 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu resolved FLINK-34051.
--
Fix Version/s: 1.19.0
   Resolution: Fixed

merged 7745ef83 and ecb68704 into master

> Fix equals/hashCode/toString for SavepointRestoreSettings
> -
>
> Key: FLINK-34051
> URL: https://issues.apache.org/jira/browse/FLINK-34051
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.19.0
>Reporter: Hangxiang Yu
>Assignee: Hangxiang Yu
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> SavepointRestoreSettings#equals/hashCode/toString missed restoreMode property



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33932) Support Retry Mechanism in RocksDBStateDataTransfer

2024-01-11 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805467#comment-17805467
 ] 

Hangxiang Yu commented on FLINK-33932:
--

Sure, already assigned to you.

Please take a look my extra comments in the thread.

> Support Retry Mechanism in RocksDBStateDataTransfer
> ---
>
> Key: FLINK-33932
> URL: https://issues.apache.org/jira/browse/FLINK-33932
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: Guojun Li
>Assignee: xiangyu feng
>Priority: Major
>  Labels: pull-request-available
>
> Currently, there is no retry mechanism for downloading and uploading RocksDB 
> state files. Any jittering of remote filesystem might lead to a checkpoint 
> failure. By supporting retry mechanism in RocksDBStateDataTransfer, we can 
> significantly reduce the failure rate of checkpoint during asynchronous phase.
> The exception is as below:
> {noformat}
>  
> 2023-12-19 08:46:00,197 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Decline 
> checkpoint 2 by task 
> 5b008c2c048fa8534d648455e69f9497_fbcce2f96483f8f15e87dc6c9afd372f_183_0 of 
> job a025f19e at 
> application-6f1c6e3d-1702480803995-5093022-taskmanager-1-1 @ 
> fdbd:dc61:1a:101:0:0:0:36 (dataPort=38789).
> org.apache.flink.util.SerializedThrowable: 
> org.apache.flink.runtime.checkpoint.CheckpointException: Asynchronous task 
> checkpoint failed.
>     at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:320)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:155)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  ~[?:?]
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  ~[?:?]
>     at java.lang.Thread.run(Thread.java:829) [?:?]
> Caused by: org.apache.flink.util.SerializedThrowable: java.lang.Exception: 
> Could not materialize checkpoint 2 for operator GlobalGroupAggregate[132] -> 
> Calc[133] (184/500)#0.
>     at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:298)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     ... 4 more
> Caused by: org.apache.flink.util.SerializedThrowable: 
> java.util.concurrent.ExecutionException: java.io.IOException: Could not flush 
> to file and close the file system output stream to 
> hdfs://xxx/shared/97329cbd-fb90-45ca-92de-edf52d6e6fc0 in order to obtain the 
> stream state handle
>     at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?]
>     at java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?]
>     at 
> org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:544)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:54)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     ... 3 more
> Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException: 
> Could not flush to file and close the file system output stream to 
> hdfs://xxx/shared/97329cbd-fb90-45ca-92de-edf52d6e6fc0 in order to obtain the 
> stream state handle
>     at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:516)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:157)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:113)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:32)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
>  ~[?:?]
>     ... 3 more
> Caused by: 

[jira] [Assigned] (FLINK-33932) Support Retry Mechanism in RocksDBStateDataTransfer

2024-01-11 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu reassigned FLINK-33932:


Assignee: xiangyu feng

> Support Retry Mechanism in RocksDBStateDataTransfer
> ---
>
> Key: FLINK-33932
> URL: https://issues.apache.org/jira/browse/FLINK-33932
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: Guojun Li
>Assignee: xiangyu feng
>Priority: Major
>  Labels: pull-request-available
>
> Currently, there is no retry mechanism for downloading and uploading RocksDB 
> state files. Any jittering of remote filesystem might lead to a checkpoint 
> failure. By supporting retry mechanism in RocksDBStateDataTransfer, we can 
> significantly reduce the failure rate of checkpoint during asynchronous phase.
> The exception is as below:
> {noformat}
>  
> 2023-12-19 08:46:00,197 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Decline 
> checkpoint 2 by task 
> 5b008c2c048fa8534d648455e69f9497_fbcce2f96483f8f15e87dc6c9afd372f_183_0 of 
> job a025f19e at 
> application-6f1c6e3d-1702480803995-5093022-taskmanager-1-1 @ 
> fdbd:dc61:1a:101:0:0:0:36 (dataPort=38789).
> org.apache.flink.util.SerializedThrowable: 
> org.apache.flink.runtime.checkpoint.CheckpointException: Asynchronous task 
> checkpoint failed.
>     at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:320)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:155)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  ~[?:?]
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  ~[?:?]
>     at java.lang.Thread.run(Thread.java:829) [?:?]
> Caused by: org.apache.flink.util.SerializedThrowable: java.lang.Exception: 
> Could not materialize checkpoint 2 for operator GlobalGroupAggregate[132] -> 
> Calc[133] (184/500)#0.
>     at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:298)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     ... 4 more
> Caused by: org.apache.flink.util.SerializedThrowable: 
> java.util.concurrent.ExecutionException: java.io.IOException: Could not flush 
> to file and close the file system output stream to 
> hdfs://xxx/shared/97329cbd-fb90-45ca-92de-edf52d6e6fc0 in order to obtain the 
> stream state handle
>     at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?]
>     at java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?]
>     at 
> org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:544)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:54)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     ... 3 more
> Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException: 
> Could not flush to file and close the file system output stream to 
> hdfs://xxx/shared/97329cbd-fb90-45ca-92de-edf52d6e6fc0 in order to obtain the 
> stream state handle
>     at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:516)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:157)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:113)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:32)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
>  ~[?:?]
>     ... 3 more
> Caused by: org.apache.flink.util.SerializedThrowable: 
> java.net.ConnectException: Connection timed out

[jira] [Resolved] (FLINK-33881) [TtlListState]Avoid copy and update value in TtlListState#getUnexpiredOrNull

2024-01-10 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu resolved FLINK-33881.
--
Resolution: Fixed

Thanks [~lijinzhong] for the great work!

merged 907d0f32 into master

> [TtlListState]Avoid copy and update value in TtlListState#getUnexpiredOrNull
> 
>
> Key: FLINK-33881
> URL: https://issues.apache.org/jira/browse/FLINK-33881
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Jinzhong Li
>Assignee: Jinzhong Li
>Priority: Minor
>  Labels: pull-request-available
> Attachments: image-2023-12-19-21-25-21-446.png, 
> image-2023-12-19-21-26-43-518.png
>
>
> In some scenarios, 'TtlListState#getUnexpiredOrNull -> 
> elementSerializer.copy(ttlValue)'  consumes a lot of cpu resources.
> !image-2023-12-19-21-25-21-446.png|width=529,height=119!
> I found that for TtlListState#getUnexpiredOrNull, if none of the elements 
> have expired, it still needs to copy all the elements and update the whole 
> list/map in TtlIncrementalCleanup#runCleanup();
> !image-2023-12-19-21-26-43-518.png|width=505,height=266!
> I think we could optimize TtlListState#getUnexpiredOrNull by:
> 1)find the first expired element index in the list;
> 2)If not found, return to the original list;
> 3)If found, then constrct the unexpire list (puts the previous elements into 
> the list), and go through the subsequent elements, adding expired elements 
> into the list.
> {code:java}
> public List> getUnexpiredOrNull(@Nonnull List> 
> ttlValues) {
> //...
> int firstExpireIndex = -1;
> for (int i = 0; i < ttlValues.size(); i++) {
> if (TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) {
> firstExpireIndex = i;
> break;
> }
> }
> if (firstExpireIndex == -1) {
> return ttlValues;  //return the original ttlValues
> }
> List> unexpired = new ArrayList<>(ttlValues.size());
> for (int i = 0; i < ttlValues.size(); i++) {
> if (i < firstExpireIndex) {
> // unexpired.add(ttlValues.get(i));
> unexpired.add(elementSerializer.copy(ttlValues.get(i)));
> }
> if (i > firstExpireIndex) {
> if (!TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) {
> // unexpired.add(ttlValues.get(i));
> unexpired.add(elementSerializer.copy(ttlValues.get(i)));
> }
> }
> }
> //  .
> } {code}
> *In this way, the extra iteration overhead is actually very very small, but 
> the benefit when there are no expired elements is significant.*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-33819) Support setting CompressType in RocksDBStateBackend

2024-01-10 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu reassigned FLINK-33819:


Assignee: Yue Ma

> Support setting CompressType in RocksDBStateBackend
> ---
>
> Key: FLINK-33819
> URL: https://issues.apache.org/jira/browse/FLINK-33819
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Yue Ma
>Assignee: Yue Ma
>Priority: Major
> Fix For: 1.19.0
>
> Attachments: image-2023-12-14-11-32-32-968.png, 
> image-2023-12-14-11-35-22-306.png
>
>
> Currently, RocksDBStateBackend does not support setting the compression 
> level, and Snappy is used for compression by default. But we have some 
> scenarios where compression will use a lot of CPU resources. Turning off 
> compression can significantly reduce CPU overhead. So we may need to support 
> a parameter for users to set the CompressType of Rocksdb.
>   !image-2023-12-14-11-35-22-306.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-33946) RocksDb sets setAvoidFlushDuringShutdown to true to speed up Task Cancel

2024-01-10 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu reassigned FLINK-33946:


Assignee: Yue Ma  (was: Hangxiang Yu)

> RocksDb sets setAvoidFlushDuringShutdown to true to speed up Task Cancel
> 
>
> Key: FLINK-33946
> URL: https://issues.apache.org/jira/browse/FLINK-33946
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.19.0
>Reporter: Yue Ma
>Assignee: Yue Ma
>Priority: Major
> Fix For: 1.19.0
>
>
> When a Job fails, the task needs to be canceled and re-deployed. 
> RocksDBStatebackend will call RocksDB.close when disposing.
> {code:java}
> if (!shutting_down_.load(std::memory_order_acquire) &&
> has_unpersisted_data_.load(std::memory_order_relaxed) &&
> !mutable_db_options_.avoid_flush_during_shutdown) {
>   if (immutable_db_options_.atomic_flush) {
> autovector cfds;
> SelectColumnFamiliesForAtomicFlush();
> mutex_.Unlock();
> Status s =
> AtomicFlushMemTables(cfds, FlushOptions(), FlushReason::kShutDown);
> s.PermitUncheckedError();  //**TODO: What to do on error?
> mutex_.Lock();
>   } else {
> for (auto cfd : *versions_->GetColumnFamilySet()) {
>   if (!cfd->IsDropped() && cfd->initialized() && !cfd->mem()->IsEmpty()) {
> cfd->Ref();
> mutex_.Unlock();
> Status s = FlushMemTable(cfd, FlushOptions(), FlushReason::kShutDown);
> s.PermitUncheckedError();  //**TODO: What to do on error?
> mutex_.Lock();
> cfd->UnrefAndTryDelete();
>   }
> }
>   } {code}
> By default (avoid_flush_during_shutdown=false) RocksDb requires FlushMemtable 
> when Close. When the disk pressure is high or the Memtable is large, this 
> process will be more time-consuming, which will cause the Task to get stuck 
> in the Canceling stage and affect the speed of job Failover.
> In fact, it is completely unnecessary to Flush memtable when Flink Task is 
> Close, because the data can be replayed from Checkpoint. So we can set 
> avoid_flush_during_shutdown to true to speed up Task Failover



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-33946) RocksDb sets setAvoidFlushDuringShutdown to true to speed up Task Cancel

2024-01-10 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu reassigned FLINK-33946:


Assignee: Hangxiang Yu

> RocksDb sets setAvoidFlushDuringShutdown to true to speed up Task Cancel
> 
>
> Key: FLINK-33946
> URL: https://issues.apache.org/jira/browse/FLINK-33946
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.19.0
>Reporter: Yue Ma
>Assignee: Hangxiang Yu
>Priority: Major
> Fix For: 1.19.0
>
>
> When a Job fails, the task needs to be canceled and re-deployed. 
> RocksDBStatebackend will call RocksDB.close when disposing.
> {code:java}
> if (!shutting_down_.load(std::memory_order_acquire) &&
> has_unpersisted_data_.load(std::memory_order_relaxed) &&
> !mutable_db_options_.avoid_flush_during_shutdown) {
>   if (immutable_db_options_.atomic_flush) {
> autovector cfds;
> SelectColumnFamiliesForAtomicFlush();
> mutex_.Unlock();
> Status s =
> AtomicFlushMemTables(cfds, FlushOptions(), FlushReason::kShutDown);
> s.PermitUncheckedError();  //**TODO: What to do on error?
> mutex_.Lock();
>   } else {
> for (auto cfd : *versions_->GetColumnFamilySet()) {
>   if (!cfd->IsDropped() && cfd->initialized() && !cfd->mem()->IsEmpty()) {
> cfd->Ref();
> mutex_.Unlock();
> Status s = FlushMemTable(cfd, FlushOptions(), FlushReason::kShutDown);
> s.PermitUncheckedError();  //**TODO: What to do on error?
> mutex_.Lock();
> cfd->UnrefAndTryDelete();
>   }
> }
>   } {code}
> By default (avoid_flush_during_shutdown=false) RocksDb requires FlushMemtable 
> when Close. When the disk pressure is high or the Memtable is large, this 
> process will be more time-consuming, which will cause the Task to get stuck 
> in the Canceling stage and affect the speed of job Failover.
> In fact, it is completely unnecessary to Flush memtable when Flink Task is 
> Close, because the data can be replayed from Checkpoint. So we can set 
> avoid_flush_during_shutdown to true to speed up Task Failover



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34015) Setting `execution.savepoint.ignore-unclaimed-state` does not take effect when passing this parameter by dynamic properties

2024-01-10 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805359#comment-17805359
 ] 

Hangxiang Yu commented on FLINK-34015:
--

[~zhourenxiang] Fine, it's vaild.

I have assigned it to you and commented in your pr, please take a look and go 
ahead.

> Setting `execution.savepoint.ignore-unclaimed-state` does not take effect 
> when passing this parameter by dynamic properties
> ---
>
> Key: FLINK-34015
> URL: https://issues.apache.org/jira/browse/FLINK-34015
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.17.0
>Reporter: Renxiang Zhou
>Assignee: Renxiang Zhou
>Priority: Critical
>  Labels: ignore-unclaimed-state-invalid, pull-request-available
> Attachments: image-2024-01-08-14-22-09-758.png, 
> image-2024-01-08-14-24-30-665.png
>
>
> We set `execution.savepoint.ignore-unclaimed-state` to true and use -D option 
> to submit the job, but unfortunately we found the value is still false in 
> jobmanager log.
> Pic 1: we  set `execution.savepoint.ignore-unclaimed-state` to true in 
> submiting job.
> !image-2024-01-08-14-22-09-758.png|width=1012,height=222!
> Pic 2: The value is still false in jmlog.
> !image-2024-01-08-14-24-30-665.png|width=651,height=51!
>  
> Besides, the parameter `execution.savepoint-restore-mode` has the same 
> problem since when we pass it by -D option.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-34015) Setting `execution.savepoint.ignore-unclaimed-state` does not take effect when passing this parameter by dynamic properties

2024-01-10 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu reassigned FLINK-34015:


Assignee: Hangxiang Yu

> Setting `execution.savepoint.ignore-unclaimed-state` does not take effect 
> when passing this parameter by dynamic properties
> ---
>
> Key: FLINK-34015
> URL: https://issues.apache.org/jira/browse/FLINK-34015
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.17.0
>Reporter: Renxiang Zhou
>Assignee: Hangxiang Yu
>Priority: Critical
>  Labels: ignore-unclaimed-state-invalid, pull-request-available
> Attachments: image-2024-01-08-14-22-09-758.png, 
> image-2024-01-08-14-24-30-665.png
>
>
> We set `execution.savepoint.ignore-unclaimed-state` to true and use -D option 
> to submit the job, but unfortunately we found the value is still false in 
> jobmanager log.
> Pic 1: we  set `execution.savepoint.ignore-unclaimed-state` to true in 
> submiting job.
> !image-2024-01-08-14-22-09-758.png|width=1012,height=222!
> Pic 2: The value is still false in jmlog.
> !image-2024-01-08-14-24-30-665.png|width=651,height=51!
>  
> Besides, the parameter `execution.savepoint-restore-mode` has the same 
> problem since when we pass it by -D option.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-34015) Setting `execution.savepoint.ignore-unclaimed-state` does not take effect when passing this parameter by dynamic properties

2024-01-10 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu reassigned FLINK-34015:


Assignee: Renxiang Zhou  (was: Hangxiang Yu)

> Setting `execution.savepoint.ignore-unclaimed-state` does not take effect 
> when passing this parameter by dynamic properties
> ---
>
> Key: FLINK-34015
> URL: https://issues.apache.org/jira/browse/FLINK-34015
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.17.0
>Reporter: Renxiang Zhou
>Assignee: Renxiang Zhou
>Priority: Critical
>  Labels: ignore-unclaimed-state-invalid, pull-request-available
> Attachments: image-2024-01-08-14-22-09-758.png, 
> image-2024-01-08-14-24-30-665.png
>
>
> We set `execution.savepoint.ignore-unclaimed-state` to true and use -D option 
> to submit the job, but unfortunately we found the value is still false in 
> jobmanager log.
> Pic 1: we  set `execution.savepoint.ignore-unclaimed-state` to true in 
> submiting job.
> !image-2024-01-08-14-22-09-758.png|width=1012,height=222!
> Pic 2: The value is still false in jmlog.
> !image-2024-01-08-14-24-30-665.png|width=651,height=51!
>  
> Besides, the parameter `execution.savepoint-restore-mode` has the same 
> problem since when we pass it by -D option.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30088) Excessive state updates for TtlMapState and TtlListState

2024-01-10 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu updated FLINK-30088:
-
Component/s: Runtime / State Backends

> Excessive state updates for TtlMapState and TtlListState
> 
>
> Key: FLINK-30088
> URL: https://issues.apache.org/jira/browse/FLINK-30088
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Roman Boyko
>Assignee: Roman Boyko
>Priority: Minor
>  Labels: pull-request-available, stale-assigned
> Attachments: image-2022-11-18-20-25-14-466.png, 
> image-2022-11-18-20-27-24-054.png
>
>
> After merging the FLINK-21413 every ttl check for cleanup for TtlMapState and 
> TtlListState (even without expired elements) leads to whole state update.
> This is because:
> - comparison by link inside `TtlIncrementalCleanup`:
> !image-2022-11-18-20-25-14-466.png|width=450,height=288!
> - and creating new map or list inside TtlMapState or TtlListState:
> !image-2022-11-18-20-27-24-054.png|width=477,height=365!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34051) Fix equals/hashCode/toString for SavepointRestoreSettings

2024-01-10 Thread Hangxiang Yu (Jira)
Hangxiang Yu created FLINK-34051:


 Summary: Fix equals/hashCode/toString for SavepointRestoreSettings
 Key: FLINK-34051
 URL: https://issues.apache.org/jira/browse/FLINK-34051
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.19.0
Reporter: Hangxiang Yu
Assignee: Hangxiang Yu


SavepointRestoreSettings#equals/hashCode/toString missed restoreMode property



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34015) Setting `execution.savepoint.ignore-unclaimed-state` does not take effect when passing this parameter by dynamic properties

2024-01-10 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805160#comment-17805160
 ] 

Hangxiang Yu commented on FLINK-34015:
--

[~zhourenxiang] Thanks for reporting this.

IIUC, it's because SavepointRestoreSettings has been overrided by the none 
value passed by CLI, right ?

> Setting `execution.savepoint.ignore-unclaimed-state` does not take effect 
> when passing this parameter by dynamic properties
> ---
>
> Key: FLINK-34015
> URL: https://issues.apache.org/jira/browse/FLINK-34015
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.17.0
>Reporter: Renxiang Zhou
>Priority: Critical
>  Labels: ignore-unclaimed-state-invalid, pull-request-available
> Attachments: image-2024-01-08-14-22-09-758.png, 
> image-2024-01-08-14-24-30-665.png
>
>
> We set `execution.savepoint.ignore-unclaimed-state` to true and use -D option 
> to submit the job, but unfortunately we found the value is still false in 
> jobmanager log.
> Pic 1: we  set `execution.savepoint.ignore-unclaimed-state` to true in 
> submiting job.
> !image-2024-01-08-14-22-09-758.png|width=1012,height=222!
> Pic 2: The value is still false in jmlog.
> !image-2024-01-08-14-24-30-665.png|width=651,height=51!
>  
> Besides, the parameter `execution.savepoint-restore-mode` has the same 
> problem since when we pass it by -D option.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33819) Support setting CompressType in RocksDBStateBackend

2024-01-10 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805154#comment-17805154
 ] 

Hangxiang Yu commented on FLINK-33819:
--

{quote}It is obvious that closing Compression is not only about benefits, but 
also brings some space amplification. What I want to express is that we may 
need to provide such a configuration for users to balance how to exchange space 
for time
{quote}
Yeah, that's also what we found in the production environment which could 
improve the performance when we use NoCompression for L0 and L1 at the expense 
of space.

So I'm +1 to introduce such a configuration. Of course, we should remain the 
default behavious at the first version.
{quote}After switching the Compression Type, the newly generated file will be 
compressed using the new Compress Type, and the existing file can still be read 
and written with old Compress Type. 
{quote}
Yes, you're right. I have verified this. RocksDB will handle this situation.

 

[~mayuehappy] Would you like to take this ?

 

> Support setting CompressType in RocksDBStateBackend
> ---
>
> Key: FLINK-33819
> URL: https://issues.apache.org/jira/browse/FLINK-33819
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Yue Ma
>Priority: Major
> Fix For: 1.19.0
>
> Attachments: image-2023-12-14-11-32-32-968.png, 
> image-2023-12-14-11-35-22-306.png
>
>
> Currently, RocksDBStateBackend does not support setting the compression 
> level, and Snappy is used for compression by default. But we have some 
> scenarios where compression will use a lot of CPU resources. Turning off 
> compression can significantly reduce CPU overhead. So we may need to support 
> a parameter for users to set the CompressType of Rocksdb.
>   !image-2023-12-14-11-35-22-306.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33946) RocksDb sets setAvoidFlushDuringShutdown to true to speed up Task Cancel

2024-01-10 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805137#comment-17805137
 ] 

Hangxiang Yu commented on FLINK-33946:
--

[~mayuehappy] Thanks for reporting this. It makes sense.

Would you like to improve this ?

> RocksDb sets setAvoidFlushDuringShutdown to true to speed up Task Cancel
> 
>
> Key: FLINK-33946
> URL: https://issues.apache.org/jira/browse/FLINK-33946
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.19.0
>Reporter: Yue Ma
>Priority: Major
> Fix For: 1.19.0
>
>
> When a Job fails, the task needs to be canceled and re-deployed. 
> RocksDBStatebackend will call RocksDB.close when disposing.
> {code:java}
> if (!shutting_down_.load(std::memory_order_acquire) &&
> has_unpersisted_data_.load(std::memory_order_relaxed) &&
> !mutable_db_options_.avoid_flush_during_shutdown) {
>   if (immutable_db_options_.atomic_flush) {
> autovector cfds;
> SelectColumnFamiliesForAtomicFlush();
> mutex_.Unlock();
> Status s =
> AtomicFlushMemTables(cfds, FlushOptions(), FlushReason::kShutDown);
> s.PermitUncheckedError();  //**TODO: What to do on error?
> mutex_.Lock();
>   } else {
> for (auto cfd : *versions_->GetColumnFamilySet()) {
>   if (!cfd->IsDropped() && cfd->initialized() && !cfd->mem()->IsEmpty()) {
> cfd->Ref();
> mutex_.Unlock();
> Status s = FlushMemTable(cfd, FlushOptions(), FlushReason::kShutDown);
> s.PermitUncheckedError();  //**TODO: What to do on error?
> mutex_.Lock();
> cfd->UnrefAndTryDelete();
>   }
> }
>   } {code}
> By default (avoid_flush_during_shutdown=false) RocksDb requires FlushMemtable 
> when Close. When the disk pressure is high or the Memtable is large, this 
> process will be more time-consuming, which will cause the Task to get stuck 
> in the Canceling stage and affect the speed of job Failover.
> In fact, it is completely unnecessary to Flush memtable when Flink Task is 
> Close, because the data can be replayed from Checkpoint. So we can set 
> avoid_flush_during_shutdown to true to speed up Task Failover



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33932) Support retry mechanism for rocksdb uploader

2024-01-09 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17804951#comment-17804951
 ] 

Hangxiang Yu commented on FLINK-33932:
--

Thanks for pinging me here.

Retry machanism is common when we want to get or put data by network.

So I think it makes sense for checkpoint failure due to temproal network 
problem, which may increase a bit overhead for some other reasons.

Since Flink has checkpoint machanism to retry failed checkpoint coarsely, I 
think it looks good to me if this fine-grained retry could be configurable and 
don't change current default machanism.

> Support retry mechanism for rocksdb uploader
> 
>
> Key: FLINK-33932
> URL: https://issues.apache.org/jira/browse/FLINK-33932
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: Guojun Li
>Priority: Major
>  Labels: pull-request-available
>
> Rocksdb uploader will throw exception and decline the current checkpoint if 
> there are errors when uploading to remote file system like hdfs.
> The exception is as below:
> {noformat}
>  
> 2023-12-19 08:46:00,197 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Decline 
> checkpoint 2 by task 
> 5b008c2c048fa8534d648455e69f9497_fbcce2f96483f8f15e87dc6c9afd372f_183_0 of 
> job a025f19e at 
> application-6f1c6e3d-1702480803995-5093022-taskmanager-1-1 @ 
> fdbd:dc61:1a:101:0:0:0:36 (dataPort=38789).
> org.apache.flink.util.SerializedThrowable: 
> org.apache.flink.runtime.checkpoint.CheckpointException: Asynchronous task 
> checkpoint failed.
>     at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:320)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:155)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  ~[?:?]
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  ~[?:?]
>     at java.lang.Thread.run(Thread.java:829) [?:?]
> Caused by: org.apache.flink.util.SerializedThrowable: java.lang.Exception: 
> Could not materialize checkpoint 2 for operator GlobalGroupAggregate[132] -> 
> Calc[133] (184/500)#0.
>     at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:298)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     ... 4 more
> Caused by: org.apache.flink.util.SerializedThrowable: 
> java.util.concurrent.ExecutionException: java.io.IOException: Could not flush 
> to file and close the file system output stream to 
> hdfs://xxx/shared/97329cbd-fb90-45ca-92de-edf52d6e6fc0 in order to obtain the 
> stream state handle
>     at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?]
>     at java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?]
>     at 
> org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:544)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:54)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     ... 3 more
> Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException: 
> Could not flush to file and close the file system output stream to 
> hdfs://xxx/shared/97329cbd-fb90-45ca-92de-edf52d6e6fc0 in order to obtain the 
> stream state handle
>     at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:516)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:157)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:113)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:32)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     

[jira] [Updated] (FLINK-34030) Avoid using negative value for periodic-materialize.interval

2024-01-09 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34030?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu updated FLINK-34030:
-
Affects Version/s: 1.19.0

> Avoid using negative value for periodic-materialize.interval
> 
>
> Key: FLINK-34030
> URL: https://issues.apache.org/jira/browse/FLINK-34030
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.19.0
>Reporter: Hangxiang Yu
>Assignee: Hangxiang Yu
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> Similar to FLINK-32023, a nagative value doesn't work for Duration Type.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-34030) Avoid using negative value for periodic-materialize.interval

2024-01-09 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34030?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu resolved FLINK-34030.
--
Fix Version/s: 1.19.0
   Resolution: Fixed

merged a6c1f308 and 96d62017 into master

> Avoid using negative value for periodic-materialize.interval
> 
>
> Key: FLINK-34030
> URL: https://issues.apache.org/jira/browse/FLINK-34030
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Hangxiang Yu
>Assignee: Hangxiang Yu
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> Similar to FLINK-32023, a nagative value doesn't work for Duration Type.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34032) Cleanup local-recovery dir when switching local-recovery from enabled to disabled

2024-01-08 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu updated FLINK-34032:
-
Summary: Cleanup local-recovery dir when switching local-recovery from 
enabled to disabled  (was: Cleanup local=recovery dir when switching 
local-recovery from enabled to disabled)

> Cleanup local-recovery dir when switching local-recovery from enabled to 
> disabled
> -
>
> Key: FLINK-34032
> URL: https://issues.apache.org/jira/browse/FLINK-34032
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Hangxiang Yu
>Priority: Major
>
> When switching local-recovery from enabled to disabled, the local-recovery 
> dir could not be cleaned.
> In particular, for a job that switched multiple times, lots of historical 
> local checkpoints will be retained forever.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34032) Cleanup local=recovery dir when switching local-recovery from enabled to disabled

2024-01-08 Thread Hangxiang Yu (Jira)
Hangxiang Yu created FLINK-34032:


 Summary: Cleanup local=recovery dir when switching local-recovery 
from enabled to disabled
 Key: FLINK-34032
 URL: https://issues.apache.org/jira/browse/FLINK-34032
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Reporter: Hangxiang Yu


When switching local-recovery from enabled to disabled, the local-recovery dir 
could not be cleaned.

In particular, for a job that switched multiple times, lots of historical local 
checkpoints will be retained forever.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34030) Avoid using negative value for periodic-materialize.interval

2024-01-08 Thread Hangxiang Yu (Jira)
Hangxiang Yu created FLINK-34030:


 Summary: Avoid using negative value for 
periodic-materialize.interval
 Key: FLINK-34030
 URL: https://issues.apache.org/jira/browse/FLINK-34030
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Reporter: Hangxiang Yu
Assignee: Hangxiang Yu


Similar to FLINK-32023, a nagative value doesn't work for Duration Type.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33856) Add metrics to monitor the interaction performance between task and external storage system in the process of checkpoint making

2023-12-26 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17800683#comment-17800683
 ] 

Hangxiang Yu commented on FLINK-33856:
--

[~Weijie Guo] Thanks for pinging me here.

[~hejufang001]

Thanks for the proposal.

I think these metrics sound reasonable.

IIUC, they are checkpoint related task-level metrics.

I think we could use TraceReporter provided by FLINK-33695 but not use current 
MetricReporter as you could see the reaon mentioned in FLINK-33695

cc [~pnowojski] 

> Add metrics to monitor the interaction performance between task and external 
> storage system in the process of checkpoint making
> ---
>
> Key: FLINK-33856
> URL: https://issues.apache.org/jira/browse/FLINK-33856
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.18.0
>Reporter: Jufang He
>Assignee: Jufang He
>Priority: Major
>  Labels: pull-request-available
>
> When Flink makes a checkpoint, the interaction performance with the external 
> file system has a great impact on the overall time-consuming. Therefore, it 
> is easy to observe the bottleneck point by adding performance indicators when 
> the task interacts with the external file storage system. These include: the 
> rate of file write , the latency to write the file, the latency to close the 
> file.
> In flink side add the above metrics has the following advantages: convenient 
> statistical different task E2E time-consuming; do not need to distinguish the 
> type of external storage system, can be unified in the 
> FsCheckpointStreamFactory.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-32881) Client supports making savepoints in detach mode

2023-12-26 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu resolved FLINK-32881.
--
Resolution: Fixed

merged into master via 81598f8a...d0dbd51c

> Client supports making savepoints in detach mode
> 
>
> Key: FLINK-32881
> URL: https://issues.apache.org/jira/browse/FLINK-32881
> Project: Flink
>  Issue Type: Improvement
>  Components: Client / Job Submission, Runtime / Checkpointing
>Affects Versions: 1.19.0
>Reporter: Renxiang Zhou
>Assignee: Renxiang Zhou
>Priority: Major
>  Labels: detach-savepoint, pull-request-available
> Fix For: 1.19.0
>
> Attachments: image-2023-08-16-17-14-34-740.png, 
> image-2023-08-16-17-14-44-212.png
>
>
> When triggering a savepoint using the command-line tool, the client needs to 
> wait for the job to finish creating the savepoint before it can exit. For 
> jobs with large state, the savepoint creation process can be time-consuming, 
> leading to the following problems:
>  # Platform users may need to manage thousands of Flink tasks on a single 
> client machine. With the current savepoint triggering mode, all savepoint 
> creation threads on that machine have to wait for the job to finish the 
> snapshot, resulting in significant resource waste;
>  # If the savepoint producing time exceeds the client's timeout duration, the 
> client will throw a timeout exception and report that the triggering 
> savepoint process fails. Since different jobs have varying savepoint 
> durations, it is difficult to adjust the timeout parameter on the client side.
> Therefore, we propose adding a detach mode to trigger savepoints on the 
> client side, just similar to the detach mode behavior when submitting jobs. 
> Here are some specific details:
>  # The savepoint UUID will be generated on the client side. After 
> successfully triggering the savepoint, the client immediately returns the 
> UUID information and exits.
>  # Add a "dump-pending-savepoints" API that allows the client to check 
> whether the triggered savepoint has been successfully created.
> By implementing these changes, the client can detach from the savepoint 
> creation process, reducing resource waste, and providing a way to check the 
> status of savepoint creation.
> !image-2023-08-16-17-14-34-740.png|width=2129,height=625!!image-2023-08-16-17-14-44-212.png|width=1530,height=445!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-20772) RocksDBValueState with TTL occurs NullPointerException when calling update(null) method

2023-12-15 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu resolved FLINK-20772.
--
Fix Version/s: 1.19.0
   Resolution: Fixed

merged ac1ed6b6 and 9aa7a3cd into master

> RocksDBValueState with TTL occurs NullPointerException when calling 
> update(null) method 
> 
>
> Key: FLINK-20772
> URL: https://issues.apache.org/jira/browse/FLINK-20772
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.11.2
> Environment: Flink version: 1.11.2
> Flink Cluster: Standalone cluster with 3 Job managers and Task managers on 
> CentOS 7
>Reporter: Seongbae Chang
>Assignee: Zakelly Lan
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> beginner, pull-request-available
> Fix For: 1.19.0
>
>
> h2. Problem
>  * I use ValueState for my custom trigger and set TTL for these ValueState in 
> RocksDB backend environment.
>  * I found an error when I used this code. I know that 
> ValueState.update(null) works equally to ValueState.clear() in general. 
> Unfortunately, this error occurs after using TTL
> {code:java}
> // My Code
> ctx.getPartitionedState(batchTotalSizeStateDesc).update(null);
> {code}
>  * I tested this in Flink 1.11.2, but I think it would be a problem in upper 
> versions.
>  * Plus, I'm a beginner. So, if there is any problem in this discussion 
> issue, please give me advice about that. And I'll fix it! 
> {code:java}
> // Error Stacktrace
> Caused by: TimerException{org.apache.flink.util.FlinkRuntimeException: Error 
> while adding data to RocksDB}
>   ... 12 more
> Caused by: org.apache.flink.util.FlinkRuntimeException: Error while adding 
> data to RocksDB
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:108)
>   at 
> org.apache.flink.runtime.state.ttl.TtlValueState.update(TtlValueState.java:50)
>   at .onProcessingTime(ActionBatchTimeTrigger.java:102)
>   at .onProcessingTime(ActionBatchTimeTrigger.java:29)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onProcessingTime(WindowOperator.java:902)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:498)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1220)
>   ... 11 more
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:69)
>   at 
> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:32)
>   at 
> org.apache.flink.api.common.typeutils.CompositeSerializer.serialize(CompositeSerializer.java:142)
>   at 
> org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValueInternal(AbstractRocksDBState.java:158)
>   at 
> org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:178)
>   at 
> org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:167)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:106)
>   ... 18 more
> {code}
>  
> h2. Reason
>  * It relates to RocksDBValueState with TTLValueState
>  * In RocksDBValueState(as well as other types of ValueState), 
> *.update(null)* has to be caught in if-clauses(null checking). However, it 
> skips the null checking and then tries to serialize the null value.
> {code:java}
> // 
> https://github.com/apache/flink/blob/release-1.11/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java#L96-L110
> @Override
> public void update(V value) { 
> if (value == null) { 
> clear(); 
> return; 
> }
>  
> try { 
> backend.db.put(columnFamily, writeOptions, 
> serializeCurrentKeyWithGroupAndNamespace(), serializeValue(value)); 
> } catch (Exception e) { 
> throw new FlinkRuntimeException("Error while adding data to RocksDB", 
> e);  
> }
> }{code}
>  *  It is because that TtlValueState wraps the value(null) with the 
> LastAccessTime and makes the new TtlValue Object with the null value.
> {code:java}
> // 
> https://github.com/apache/flink/blob/release-1.11/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValueState.java#L47-L51
> @Override
> public void update(T value) throws 

[jira] [Commented] (FLINK-33819) Support setting CompressType in RocksDBStateBackend

2023-12-14 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17796992#comment-17796992
 ] 

Hangxiang Yu commented on FLINK-33819:
--

Linked FLINK-20684 as it has been discussed before.

Linked FLINK-11313 which talks about the LZ4 Compression which should be more 
usable than Snappy.

IMO, it should improve performane if we disbale the compression of L0/L1 in 
some scenes.

[~mayuehappy] Do you have some test results on it ?

BTW, If we'd like to introduce such a option, it's  better to guarantee the 
compalibility.

> Support setting CompressType in RocksDBStateBackend
> ---
>
> Key: FLINK-33819
> URL: https://issues.apache.org/jira/browse/FLINK-33819
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Yue Ma
>Priority: Major
> Fix For: 1.19.0
>
> Attachments: image-2023-12-14-11-32-32-968.png, 
> image-2023-12-14-11-35-22-306.png
>
>
> Currently, RocksDBStateBackend does not support setting the compression 
> level, and Snappy is used for compression by default. But we have some 
> scenarios where compression will use a lot of CPU resources. Turning off 
> compression can significantly reduce CPU overhead. So we may need to support 
> a parameter for users to set the CompressType of Rocksdb.
>   !image-2023-12-14-11-35-22-306.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-33723) Disallow triggering incremental checkpoint explicitly from REST API

2023-12-13 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu resolved FLINK-33723.
--
Resolution: Fixed

merged 07d159bf into master

> Disallow triggering incremental checkpoint explicitly from REST API
> ---
>
> Key: FLINK-33723
> URL: https://issues.apache.org/jira/browse/FLINK-33723
> Project: Flink
>  Issue Type: Improvement
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> Currently, when a job is configured to run with incremental checkpoint 
> disabled, user manually triggers an incremental checkpoint actually 
> triggering a full checkpoint. That is because the files from full checkpoint 
> cannot be shared with an incremental checkpoint. So it is better to remove 
> the "INCREMENTAL" option in triggering checkpoint from REST API to avoid 
> misunderstanding.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-26585) State Processor API: Loading a state set buffers the whole state set in memory before starting to process

2023-12-11 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795627#comment-17795627
 ] 

Hangxiang Yu commented on FLINK-26585:
--

[~jingge] I saw 1.18.1 is releasing.

Could this PR be involved in 1.18.1 or next 1.18.2 ? 

It could help a lot in the case [~czchen] mentioned.

> State Processor API: Loading a state set buffers the whole state set in 
> memory before starting to process
> -
>
> Key: FLINK-26585
> URL: https://issues.apache.org/jira/browse/FLINK-26585
> Project: Flink
>  Issue Type: Improvement
>  Components: API / State Processor
>Affects Versions: 1.13.0, 1.14.0, 1.15.0
>Reporter: Matthias Schwalbe
>Assignee: Matthias Schwalbe
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
> Attachments: MultiStateKeyIteratorNoStreams.java
>
>
> * When loading a state, MultiStateKeyIterator load and bufferes the whole 
> state in memory before it event processes a single data point 
>  ** This is absolutely no problem for small state (hence the unit tests work 
> fine)
>  ** MultiStateKeyIterator ctor sets up a java Stream that iterates all state 
> descriptors and flattens all datapoints contained within
>  ** The java.util.stream.Stream#flatMap function causes the buffering of the 
> whole data set when enumerated later on
>  ** See call stack [1] 
>  *** I our case this is 150e6 data points (> 1GiB just for the pointers to 
> the data, let alone the data itself ~30GiB)
>  ** I’m not aware of some instrumentation of Stream in order to avoid the 
> problem, hence
>  ** I coded an alternative implementation of MultiStateKeyIterator that 
> avoids using java Stream,
>  ** I can contribute our implementation (MultiStateKeyIteratorNoStreams)
> [1]
> Streams call stack:
> hasNext:77, RocksStateKeysIterator 
> (org.apache.flink.contrib.streaming.state.iterator)
> next:82, RocksStateKeysIterator 
> (org.apache.flink.contrib.streaming.state.iterator)
> forEachRemaining:116, Iterator (java.util)
> forEachRemaining:1801, Spliterators$IteratorSpliterator (java.util)
> forEach:580, ReferencePipeline$Head (java.util.stream)
> accept:270, ReferencePipeline$7$1 (java.util.stream)  
>  #  Stream flatMap(final Function Stream> var1)
> accept:373, ReferencePipeline$11$1 (java.util.stream) 
>  # Stream peek(final Consumer var1)
> accept:193, ReferencePipeline$3$1 (java.util.stream)      
>  #  Stream map(final Function 
> var1)
> tryAdvance:1359, ArrayList$ArrayListSpliterator (java.util)
> lambda$initPartialTraversalState$0:294, 
> StreamSpliterators$WrappingSpliterator (java.util.stream)
> getAsBoolean:-1, 1528195520 
> (java.util.stream.StreamSpliterators$WrappingSpliterator$$Lambda$57)
> fillBuffer:206, StreamSpliterators$AbstractWrappingSpliterator 
> (java.util.stream)
> doAdvance:161, StreamSpliterators$AbstractWrappingSpliterator 
> (java.util.stream)
> tryAdvance:300, StreamSpliterators$WrappingSpliterator (java.util.stream)
> hasNext:681, Spliterators$1Adapter (java.util)
> hasNext:83, MultiStateKeyIterator (org.apache.flink.state.api.input)
> hasNext:162, KeyedStateReaderOperator$NamespaceDecorator 
> (org.apache.flink.state.api.input.operator)
> reachedEnd:215, KeyedStateInputFormat (org.apache.flink.state.api.input)
> invoke:191, DataSourceTask (org.apache.flink.runtime.operators)
> doRun:776, Task (org.apache.flink.runtime.taskmanager)
> run:563, Task (org.apache.flink.runtime.taskmanager)
> run:748, Thread (java.lang)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-26586) FileSystem uses unbuffered read I/O

2023-12-11 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795577#comment-17795577
 ] 

Hangxiang Yu commented on FLINK-26586:
--

[~Matthias Schwalbe] Sure. Just feel free to contribute it.

Already assigned to you, please go ahead.

> FileSystem uses unbuffered read I/O
> ---
>
> Key: FLINK-26586
> URL: https://issues.apache.org/jira/browse/FLINK-26586
> Project: Flink
>  Issue Type: Improvement
>  Components: API / State Processor, Connectors / FileSystem, Runtime 
> / Checkpointing
>Affects Versions: 1.13.0, 1.14.0
>Reporter: Matthias Schwalbe
>Assignee: Matthias Schwalbe
>Priority: Major
> Attachments: BufferedFSDataInputStreamWrapper.java, 
> BufferedLocalFileSystem.java
>
>
> - I found out that, at least when using LocalFileSystem on a windows system, 
> read I/O to load a savepoint is unbuffered,
>  - See example stack [1]
>  - i.e. in order to load only a long in a serializer, it needs to go into 
> kernel mode 8 times and load the 8 bytes one by one
>  - I coded a BufferedFSDataInputStreamWrapper that allows to opt-in buffered 
> reads on any FileSystem implementation
>  - In our setting savepoint load is now 30 times faster
>  - I’ve once seen a Jira ticket as to improve savepoint load time in general 
> (lost the link unfortunately), maybe this approach can help with it
>  - not sure if HDFS has got the same problem
>  - I can contribute my implementation of a BufferedFSDataInputStreamWrapper 
> which can be integrated in any 
> [1] unbuffered reads stack:
> read:207, FileInputStream (java.io)
> read:68, LocalDataInputStream (org.apache.flink.core.fs.local)
> read:50, FSDataInputStreamWrapper (org.apache.flink.core.fs)
> read:42, ForwardingInputStream (org.apache.flink.runtime.util)
> readInt:390, DataInputStream (java.io)
> deserialize:80, BytePrimitiveArraySerializer 
> (org.apache.flink.api.common.typeutils.base.array)
> next:298, FullSnapshotRestoreOperation$KeyGroupEntriesIterator 
> (org.apache.flink.runtime.state.restore)
> next:273, FullSnapshotRestoreOperation$KeyGroupEntriesIterator 
> (org.apache.flink.runtime.state.restore)
> restoreKVStateData:147, RocksDBFullRestoreOperation 
> (org.apache.flink.contrib.streaming.state.restore)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-26586) FileSystem uses unbuffered read I/O

2023-12-11 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu reassigned FLINK-26586:


Assignee: Matthias Schwalbe

> FileSystem uses unbuffered read I/O
> ---
>
> Key: FLINK-26586
> URL: https://issues.apache.org/jira/browse/FLINK-26586
> Project: Flink
>  Issue Type: Improvement
>  Components: API / State Processor, Connectors / FileSystem, Runtime 
> / Checkpointing
>Affects Versions: 1.13.0, 1.14.0
>Reporter: Matthias Schwalbe
>Assignee: Matthias Schwalbe
>Priority: Major
> Attachments: BufferedFSDataInputStreamWrapper.java, 
> BufferedLocalFileSystem.java
>
>
> - I found out that, at least when using LocalFileSystem on a windows system, 
> read I/O to load a savepoint is unbuffered,
>  - See example stack [1]
>  - i.e. in order to load only a long in a serializer, it needs to go into 
> kernel mode 8 times and load the 8 bytes one by one
>  - I coded a BufferedFSDataInputStreamWrapper that allows to opt-in buffered 
> reads on any FileSystem implementation
>  - In our setting savepoint load is now 30 times faster
>  - I’ve once seen a Jira ticket as to improve savepoint load time in general 
> (lost the link unfortunately), maybe this approach can help with it
>  - not sure if HDFS has got the same problem
>  - I can contribute my implementation of a BufferedFSDataInputStreamWrapper 
> which can be integrated in any 
> [1] unbuffered reads stack:
> read:207, FileInputStream (java.io)
> read:68, LocalDataInputStream (org.apache.flink.core.fs.local)
> read:50, FSDataInputStreamWrapper (org.apache.flink.core.fs)
> read:42, ForwardingInputStream (org.apache.flink.runtime.util)
> readInt:390, DataInputStream (java.io)
> deserialize:80, BytePrimitiveArraySerializer 
> (org.apache.flink.api.common.typeutils.base.array)
> next:298, FullSnapshotRestoreOperation$KeyGroupEntriesIterator 
> (org.apache.flink.runtime.state.restore)
> next:273, FullSnapshotRestoreOperation$KeyGroupEntriesIterator 
> (org.apache.flink.runtime.state.restore)
> restoreKVStateData:147, RocksDBFullRestoreOperation 
> (org.apache.flink.contrib.streaming.state.restore)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33798) Automatically clean up rocksdb logs when the task failover.

2023-12-11 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795576#comment-17795576
 ] 

Hangxiang Yu commented on FLINK-33798:
--

Thanks [~fanrui] pinging me here.

I think you are right. The behavious after relocating is not consistent with 
before.

We could make it.

Thanks [~liming] for reporting this and we could go ahead.

 

> Automatically clean up rocksdb logs when the task failover.
> ---
>
> Key: FLINK-33798
> URL: https://issues.apache.org/jira/browse/FLINK-33798
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ming Li
>Assignee: Ming Li
>Priority: Major
>
> Since FLINK-24785 relocates rocksdb log, multiple rocksdb logs will be 
> created under the flink log directory, but they are not cleaned up during 
> task failover, resulting in a large number of rocksdb logs under the flink 
> log directory.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.

2023-12-07 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17794190#comment-17794190
 ] 

Hangxiang Yu commented on FLINK-27681:
--

{quote}Yes, I think we don't need any extra protection for corruption of the 
local files. From the document you shared RocksDB will throw some error every 
time we try to read a corrupted block
{quote}
Yes, reading a corrupted block must be checked which is safe.

But the write operation (e.g. flush, compaction) may introduce a new corrupted 
file which may not be checked.

And the corrupted file maybe just uploaded to remote storage without any check 
like reading block checksum when checkpoint if we don't check it manually.
{quote}Now I'm not so sure about it. Now that I think about it more, checksums 
on the filesystem level or the HDD/SSD level wouldn't protect us from a 
corruption happening after reading the bytes from local file, but before those 
bytes are acknowledged by the DFS/object store. 
{quote}
Yes, you're right. That's what I mentioned before about the end-to-end checksum 
(verify the file correctness from local to remote by unified checksum). And 
Thanks for sharing detailed infos about S3.

"But this may introduce a new API in some public classes like FileSystem which 
is a bigger topic." , maybe need a FLIP ?

We also have tried to add this end-to-end checksum in our internal Flink 
version which is doable for many file systems.

We could also contribute this after we have verified the benefits and 
performance cost if worthy doing.

> Improve the availability of Flink when the RocksDB file is corrupted.
> -
>
> Key: FLINK-27681
> URL: https://issues.apache.org/jira/browse/FLINK-27681
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ming Li
>Assignee: Yue Ma
>Priority: Critical
>  Labels: pull-request-available
> Attachments: image-2023-08-23-15-06-16-717.png
>
>
> We have encountered several times when the RocksDB checksum does not match or 
> the block verification fails when the job is restored. The reason for this 
> situation is generally that there are some problems with the machine where 
> the task is located, which causes the files uploaded to HDFS to be incorrect, 
> but it has been a long time (a dozen minutes to half an hour) when we found 
> this problem. I'm not sure if anyone else has had a similar problem.
> Since this file is referenced by incremental checkpoints for a long time, 
> when the maximum number of checkpoints reserved is exceeded, we can only use 
> this file until it is no longer referenced. When the job failed, it cannot be 
> recovered.
> Therefore we consider:
> 1. Can RocksDB periodically check whether all files are correct and find the 
> problem in time?
> 2. Can Flink automatically roll back to the previous checkpoint when there is 
> a problem with the checkpoint data, because even with manual intervention, it 
> just tries to recover from the existing checkpoint or discard the entire 
> state.
> 3. Can we increase the maximum number of references to a file based on the 
> maximum number of checkpoints reserved? When the number of references exceeds 
> the maximum number of checkpoints -1, the Task side is required to upload a 
> new file for this reference. Not sure if this way will ensure that the new 
> file we upload will be correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.

2023-12-05 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17793189#comment-17793189
 ] 

Hangxiang Yu commented on FLINK-27681:
--

Sorry for the late reply.
{quote}However,  the job must fail in the future(When the corrupted block is 
read or compacted, or checkpoint failed number >= tolerable-failed-checkpoint). 
Then it will rollback to the older checkpoint.

The older checkpoint must be before we found the file is corrupted. Therefore, 
it is useless to run a job between the time it is discovered that the file is 
corrupted and the time it actually fails.

In brief, tolerable-failed-checkpoint can work, but the extra cost isn't 
necessary.

BTW, if failing job directly, this 
[comment|https://github.com/apache/flink/pull/23765#discussion_r1404136470] 
will be solved directly.
{quote}
Thanks for the detailed clarification.

I rethinked this, seems that failing the job is more reasonable than failing 
current checkpoint. I'm +1 if we could do that.
{quote}That's a non trivial overhead. Prolonging checkpoint for 10s in many 
cases (especially low throughput large state jobs) will be prohibitively 
expensive, delaying rescaling, e2e exactly once latency, etc. 1s+ for 1GB might 
also be less then ideal to enable by default.
{quote}
Cannot agree more.
{quote}Actually, aren't all of the disks basically have some form of CRC these 
days? I'm certain that's true about SSDs. Having said that, can you 
[~masteryhx] rephrase and elaborate on those 3 scenarios that you think we need 
to protect from? Especially where does the corruption happen?
{quote}
IIUC, Once we have IO operations about the SST, the file maybe corrupted even 
if it may happen very rarely.

RocksDB also shares some situations about using full file checksum[1] which is 
related to our usage:
 # local file which is prepared to upload: as you could see "verify the SST 
file when the whole file is read in DB (e.g., compaction)." in [1], and 
checksum in block level at runtime cannot guarantee the correctness of the SST 
which we could focus on at first.
 # Uploading and Downloaing: Firstly, disk IO and network IO may make the data 
error. Secondly, remote storage is not always reliable. So the checksum could 
be used when SST files are copied to other places (e.g., backup, move, or 
replicate) or stored remotely.

IIUC, checksum in SST level could guarantee the correctness of local file.

And checksum in filesystem level could guarantee the correctness of uploading 
and downloading.

[1] 
https://github.com/facebook/rocksdb/wiki/Full-File-Checksum-and-Checksum-Handoff

> Improve the availability of Flink when the RocksDB file is corrupted.
> -
>
> Key: FLINK-27681
> URL: https://issues.apache.org/jira/browse/FLINK-27681
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ming Li
>Assignee: Yue Ma
>Priority: Critical
>  Labels: pull-request-available
> Attachments: image-2023-08-23-15-06-16-717.png
>
>
> We have encountered several times when the RocksDB checksum does not match or 
> the block verification fails when the job is restored. The reason for this 
> situation is generally that there are some problems with the machine where 
> the task is located, which causes the files uploaded to HDFS to be incorrect, 
> but it has been a long time (a dozen minutes to half an hour) when we found 
> this problem. I'm not sure if anyone else has had a similar problem.
> Since this file is referenced by incremental checkpoints for a long time, 
> when the maximum number of checkpoints reserved is exceeded, we can only use 
> this file until it is no longer referenced. When the job failed, it cannot be 
> recovered.
> Therefore we consider:
> 1. Can RocksDB periodically check whether all files are correct and find the 
> problem in time?
> 2. Can Flink automatically roll back to the previous checkpoint when there is 
> a problem with the checkpoint data, because even with manual intervention, it 
> just tries to recover from the existing checkpoint or discard the entire 
> state.
> 3. Can we increase the maximum number of references to a file based on the 
> maximum number of checkpoints reserved? When the number of references exceeds 
> the maximum number of checkpoints -1, the Task side is required to upload a 
> new file for this reference. Not sure if this way will ensure that the new 
> file we upload will be correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.

2023-11-29 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17791409#comment-17791409
 ] 

Hangxiang Yu commented on FLINK-27681:
--

{quote}Fail job directly is fine for me, but I guess the PR doesn't fail the 
job, it just fails the current checkpoint, right?
{quote}
Yeah, I think failing the checkpoint maybe also fine currently. It will not 
affect the correctness of the running job.

The downside is that the job has to rollback to the older checkpoint. But there 
should be some policies for high-quality job just as [~mayuehappy] mentioned.
{quote}If the checksum is called for each reading, can we think the check is 
very quick? If so, could we enable it directly without any option? Hey 
[~mayuehappy]  , could you provide some simple benchmark here?
{quote}
The check at runtime is block level, whose overhead should be little (rocksdb 
always need to read the block from the disk at runtime, so the checksum could 
be calculated easily).

But the checksum in file level will always be done with extra overhead, and the 
overhead will be bigger if the state is very large, so that's why I'd like to 
suggest it as an option. Also appreciate and look forward the benchmark result 
of [~mayuehappy] 

> Improve the availability of Flink when the RocksDB file is corrupted.
> -
>
> Key: FLINK-27681
> URL: https://issues.apache.org/jira/browse/FLINK-27681
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ming Li
>Assignee: Yue Ma
>Priority: Critical
>  Labels: pull-request-available
> Attachments: image-2023-08-23-15-06-16-717.png
>
>
> We have encountered several times when the RocksDB checksum does not match or 
> the block verification fails when the job is restored. The reason for this 
> situation is generally that there are some problems with the machine where 
> the task is located, which causes the files uploaded to HDFS to be incorrect, 
> but it has been a long time (a dozen minutes to half an hour) when we found 
> this problem. I'm not sure if anyone else has had a similar problem.
> Since this file is referenced by incremental checkpoints for a long time, 
> when the maximum number of checkpoints reserved is exceeded, we can only use 
> this file until it is no longer referenced. When the job failed, it cannot be 
> recovered.
> Therefore we consider:
> 1. Can RocksDB periodically check whether all files are correct and find the 
> problem in time?
> 2. Can Flink automatically roll back to the previous checkpoint when there is 
> a problem with the checkpoint data, because even with manual intervention, it 
> just tries to recover from the existing checkpoint or discard the entire 
> state.
> 3. Can we increase the maximum number of references to a file based on the 
> maximum number of checkpoints reserved? When the number of references exceeds 
> the maximum number of checkpoints -1, the Task side is required to upload a 
> new file for this reference. Not sure if this way will ensure that the new 
> file we upload will be correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33679) RestoreMode uses NO_CLAIM as default instead of LEGACY

2023-11-29 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17791403#comment-17791403
 ] 

Hangxiang Yu commented on FLINK-33679:
--

Hi, NO_CLAIM should be more production friendly than Legacy mode, so it's the 
better default option.

Do you have any other concerns about this ?

> RestoreMode uses NO_CLAIM as default instead of LEGACY
> --
>
> Key: FLINK-33679
> URL: https://issues.apache.org/jira/browse/FLINK-33679
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Runtime / State Backends
>Reporter: junzhong qin
>Priority: Minor
>
> RestoreMode uses NO_CLAIM as default instead of LEGACY.
> {code:java}
> public enum RestoreMode implements DescribedEnum {
> CLAIM(
> "Flink will take ownership of the given snapshot. It will clean 
> the"
> + " snapshot once it is subsumed by newer ones."),
> NO_CLAIM(
> "Flink will not claim ownership of the snapshot files. However it 
> will make sure it"
> + " does not depend on any artefacts from the restored 
> snapshot. In order to do that,"
> + " Flink will take the first checkpoint as a full one, 
> which means it might"
> + " reupload/duplicate files that are part of the 
> restored checkpoint."),
> LEGACY(
> "This is the mode in which Flink worked so far. It will not claim 
> ownership of the"
> + " snapshot and will not delete the files. However, it 
> can directly depend on"
> + " the existence of the files of the restored 
> checkpoint. It might not be safe"
> + " to delete checkpoints that were restored in legacy 
> mode ");
> private final String description;
> RestoreMode(String description) {
> this.description = description;
> }
> @Override
> @Internal
> public InlineElement getDescription() {
> return text(description);
> }
> public static final RestoreMode DEFAULT = NO_CLAIM;
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.

2023-11-26 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789903#comment-17789903
 ] 

Hangxiang Yu edited comment on FLINK-27681 at 11/27/23 4:33 AM:


[~pnowojski] [~fanrui]  Thanks for joining in the discussion.

Thanks [~mayuehappy] for expalining the case which we also saw in our 
production environment.

Let me also try to share my thoughts about your questions.
{quote}I'm worried that flink add check can't completely solve the problem of 
file corruption. Is it possible that file corruption occurs after flink check 
but before uploading the file to hdfs?
{quote}
I think the concern is right.

Actually, file corruption may occurs in all stages:
 # File generation at runtime (RocksDB memtable flush or Compaction)
 # Uploading when checkpoint (local file -> memory buffer -> network transfer 
-> DFS)
 # Downloading when recovery(reversed path with 2)

 

For the first situation: 
 * File corruption will not affect the read path because the checksum will be 
checked when reading rocksdb block. The job will failover when read the 
corrupted one.
 * So the core problem is that a corruped file which is not read at runtime 
will be uploaded to remote DFS when checkpoint. It will affect the normal 
processing once failover which will have severe consequence especially for high 
priority job.
 * That's why we'd like to focus on not uploading the corruped files (Also for 
just fail the job simply to make job restore from the last complete checkpoint).

For the second and third situations:
 * The ideal way is that we should unify the checksum machnism of local db and 
remote DFS.
 * Many FileSystems supports to pass the file checksum and verify it in their 
remote server. We could use this to verify the checksum end-to-end.
 * But this may introduce a new API in some public classes like FileSystem 
which is a bigger topic.
 * As we also saw many cases like [~mayuehappy] mentioned. So I think maybe we 
could resolve this case at first. I'd also like to see we have the ideal way to 
go if it worth doing.


was (Author: masteryhx):
[~pnowojski] [~fanrui]  Thanks for joining in the discussion.

Thanks [~mayuehappy] for expalining the case which we also saw in our 
production environment.

Let me also try to share my thoughts about your questions.
{quote}I'm worried that flink add check can't completely solve the problem of 
file corruption. Is it possible that file corruption occurs after flink check 
but before uploading the file to hdfs?
{quote}
I think the concern is right.

Actually, file corruption may occurs in all stages:
 # File generation at runtime (RocksDB memtable flush or Compaction)
 # Uploading when checkpoint (local file -> memory buffer -> network transfer 
-> DFS)
 # Downloading when recovery(reversed path with 2)

 

For the first situation: 
 * File corruption will not affect the read path because the checksum will be 
checked when reading rocksdb block. The job will failover when read the 
corrupted one.
 * So the core problem is that a corruption file which is not read at runtime 
will be uploaded to remote DFS when checkpoint. It will affect the normal 
processing once failover which will have severe consequence especially for high 
priority job.
 * That's why we'd like to focus on not uploading the corruped files.

For the second and third situations:
 * The ideal way is that we should unify the checksum machnism of local db and 
remote DFS.
 * Many FileSystems supports to pass the file checksum and verify it in their 
remote server. We could use this to verify the checksum end-to-end.
 * But this may introduce a new API in some public classes like FileSystem 
which is a bigger topic.
 * As we also saw many cases like [~mayuehappy] mentioned. So I think maybe we 
could resolve this case at first. I'd also like to see we have the ideal way to 
go if it worth doing.

> Improve the availability of Flink when the RocksDB file is corrupted.
> -
>
> Key: FLINK-27681
> URL: https://issues.apache.org/jira/browse/FLINK-27681
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ming Li
>Assignee: Yue Ma
>Priority: Critical
>  Labels: pull-request-available
> Attachments: image-2023-08-23-15-06-16-717.png
>
>
> We have encountered several times when the RocksDB checksum does not match or 
> the block verification fails when the job is restored. The reason for this 
> situation is generally that there are some problems with the machine where 
> the task is located, which causes the files uploaded to HDFS to be incorrect, 
> but it has been a long time (a dozen minutes to half an hour) when we found 
> this problem. I'm not sure if anyone else has 

[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.

2023-11-26 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789903#comment-17789903
 ] 

Hangxiang Yu commented on FLINK-27681:
--

[~pnowojski] [~fanrui]  Thanks for joining in the discussion.

Thanks [~mayuehappy] for expalining the case which we also saw in our 
production environment.

Let me also try to share my thoughts about your questions.
{quote}I'm worried that flink add check can't completely solve the problem of 
file corruption. Is it possible that file corruption occurs after flink check 
but before uploading the file to hdfs?
{quote}
I think the concern is right.

Actually, file corruption may occurs in all stages:
 # File generation at runtime (RocksDB memtable flush or Compaction)
 # Uploading when checkpoint (local file -> memory buffer -> network transfer 
-> DFS)
 # Downloading when recovery(reversed path with 2)

 

For the first situation: 
 * File corruption will not affect the read path because the checksum will be 
checked when reading rocksdb block. The job will failover when read the 
corrupted one.
 * So the core problem is that a corruption file which is not read at runtime 
will be uploaded to remote DFS when checkpoint. It will affect the normal 
processing once failover which will have severe consequence especially for high 
priority job.
 * That's why we'd like to focus on not uploading the corruped files.

For the second and third situations:
 * The ideal way is that we should unify the checksum machnism of local db and 
remote DFS.
 * Many FileSystems supports to pass the file checksum and verify it in their 
remote server. We could use this to verify the checksum end-to-end.
 * But this may introduce a new API in some public classes like FileSystem 
which is a bigger topic.
 * As we also saw many cases like [~mayuehappy] mentioned. So I think maybe we 
could resolve this case at first. I'd also like to see we have the ideal way to 
go if it worth doing.

> Improve the availability of Flink when the RocksDB file is corrupted.
> -
>
> Key: FLINK-27681
> URL: https://issues.apache.org/jira/browse/FLINK-27681
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ming Li
>Assignee: Yue Ma
>Priority: Critical
>  Labels: pull-request-available
> Attachments: image-2023-08-23-15-06-16-717.png
>
>
> We have encountered several times when the RocksDB checksum does not match or 
> the block verification fails when the job is restored. The reason for this 
> situation is generally that there are some problems with the machine where 
> the task is located, which causes the files uploaded to HDFS to be incorrect, 
> but it has been a long time (a dozen minutes to half an hour) when we found 
> this problem. I'm not sure if anyone else has had a similar problem.
> Since this file is referenced by incremental checkpoints for a long time, 
> when the maximum number of checkpoints reserved is exceeded, we can only use 
> this file until it is no longer referenced. When the job failed, it cannot be 
> recovered.
> Therefore we consider:
> 1. Can RocksDB periodically check whether all files are correct and find the 
> problem in time?
> 2. Can Flink automatically roll back to the previous checkpoint when there is 
> a problem with the checkpoint data, because even with manual intervention, it 
> just tries to recover from the existing checkpoint or discard the entire 
> state.
> 3. Can we increase the maximum number of references to a file based on the 
> maximum number of checkpoints reserved? When the number of references exceeds 
> the maximum number of checkpoints -1, the Task side is required to upload a 
> new file for this reference. Not sure if this way will ensure that the new 
> file we upload will be correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-30235) Comprehensive benchmarks on changelog checkpointing

2023-11-19 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu resolved FLINK-30235.
--
Fix Version/s: 1.17.0
 Assignee: Rui Xia
   Resolution: Fixed

Resolved this in 1.17.0
See the [blog 
post|https://www.ververica.com/blog/generic-log-based-incremental-checkpoint] 
for more details about the benchmark results.

> Comprehensive benchmarks on changelog checkpointing
> ---
>
> Key: FLINK-30235
> URL: https://issues.apache.org/jira/browse/FLINK-30235
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: Rui Xia
>Assignee: Rui Xia
>Priority: Minor
>  Labels: performance
> Fix For: 1.17.0
>
>
> Changelog checkpointing is functionally usable right now. To make it as a 
> productive feature, more comprehensive benchmarks are required. In this 
> issue, I aim to answer the following two major concerns:
>  * The expansion of full checkpoint size caused by changelog persistence;
>  * The TPS regression caused by DTSL double-write;
> By the way, I will also present other metrics related to checkpointing.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-18473) Optimize RocksDB disk load balancing strategy

2023-11-15 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17786602#comment-17786602
 ] 

Hangxiang Yu commented on FLINK-18473:
--

Thanks [~fanrui] for pinging me here.

[~yigress] It should work when multiple slots within single TM. But Could It go 
worser for multiple slots within multiple TM ?

If true, I'd not suggest to make it as default.

BTW, We didn't see this problem in our production environment. So could you 
also share your scenarios about using this ? Do your jobs have IO bottleneck in 
one disk ? then you'd like to use multiple HDD disks or multiple cloud disks 
like EBS ? 

If we have strong request about sharing multiple disks, I'd also suggest to 
consider making them become SharedResources just like Memory as you could see 
in FLINK-29928.

It should be better than introducing other component like Zookeeper, but still 
a bit complex. So let us see whether it worth to do.

 

> Optimize RocksDB disk load balancing strategy
> -
>
> Key: FLINK-18473
> URL: https://issues.apache.org/jira/browse/FLINK-18473
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.12.0
>Reporter: Rui Fan
>Priority: Minor
>  Labels: auto-deprioritized-major, stale-minor
>
> In general, bigdata servers have many disks. For large-state jobs, if 
> multiple slots are running on a TM, then each slot will create a RocksDB 
> instance. We hope that multiple RocksDB instances use different disks to 
> achieve load balancing.
> h3. The problem of current load balancing strategy:
> When the current RocksDB is initialized, a random value nextDirectory is 
> generated according to the number of RocksDB dir: [code 
> link|https://github.com/apache/flink/blob/2d371eb5ac9a3e485d3665cb9a740c65e2ba2ac6/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java#L441]
> {code:java}
> nextDirectory = new Random().nextInt(initializedDbBasePaths.length);
> {code}
> Different slots generate different RocksDBStateBackend objects, so each slot 
> will generate its own *nextDirectory*. The random algorithm used here, so the 
> random value generated by different slots may be the same. For example: the 
> current RocksDB dir is configured with 10 disks, the *nextDirectory* 
> generated by slot0 and slot1 are both 5, then slot0 and slot1 will use the 
> same disk. This disk will be under a lot of pressure, other disks will not be 
> under pressure.
> h3. Optimization ideas:
> *{{nextDirectory}}* should belong to slot sharing, the initial value of 
> *{{nextDirectory}}* cannot be 0, it is still generated by random. But define 
> *nextDirectory* as +_{{static AtomicInteger()}}_+ and execute 
> +_{{nextDirectory.incrementAndGet()}}_+ every time RocksDBKeyedStateBackend 
> is applied for. 
> {{nextDirectory}} takes the remainder of {{initializedDbBasePaths.length}} to 
> decide which disk to use.
> Is there any problem with the above ideas?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-26585) State Processor API: Loading a state set buffers the whole state set in memory before starting to process

2023-11-08 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu resolved FLINK-26585.
--
Fix Version/s: 1.19.0
   Resolution: Fixed

merged eb4ae5d4 into master

> State Processor API: Loading a state set buffers the whole state set in 
> memory before starting to process
> -
>
> Key: FLINK-26585
> URL: https://issues.apache.org/jira/browse/FLINK-26585
> Project: Flink
>  Issue Type: Improvement
>  Components: API / State Processor
>Affects Versions: 1.13.0, 1.14.0, 1.15.0
>Reporter: Matthias Schwalbe
>Assignee: Matthias Schwalbe
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
> Attachments: MultiStateKeyIteratorNoStreams.java
>
>
> * When loading a state, MultiStateKeyIterator load and bufferes the whole 
> state in memory before it event processes a single data point 
>  ** This is absolutely no problem for small state (hence the unit tests work 
> fine)
>  ** MultiStateKeyIterator ctor sets up a java Stream that iterates all state 
> descriptors and flattens all datapoints contained within
>  ** The java.util.stream.Stream#flatMap function causes the buffering of the 
> whole data set when enumerated later on
>  ** See call stack [1] 
>  *** I our case this is 150e6 data points (> 1GiB just for the pointers to 
> the data, let alone the data itself ~30GiB)
>  ** I’m not aware of some instrumentation of Stream in order to avoid the 
> problem, hence
>  ** I coded an alternative implementation of MultiStateKeyIterator that 
> avoids using java Stream,
>  ** I can contribute our implementation (MultiStateKeyIteratorNoStreams)
> [1]
> Streams call stack:
> hasNext:77, RocksStateKeysIterator 
> (org.apache.flink.contrib.streaming.state.iterator)
> next:82, RocksStateKeysIterator 
> (org.apache.flink.contrib.streaming.state.iterator)
> forEachRemaining:116, Iterator (java.util)
> forEachRemaining:1801, Spliterators$IteratorSpliterator (java.util)
> forEach:580, ReferencePipeline$Head (java.util.stream)
> accept:270, ReferencePipeline$7$1 (java.util.stream)  
>  #  Stream flatMap(final Function Stream> var1)
> accept:373, ReferencePipeline$11$1 (java.util.stream) 
>  # Stream peek(final Consumer var1)
> accept:193, ReferencePipeline$3$1 (java.util.stream)      
>  #  Stream map(final Function 
> var1)
> tryAdvance:1359, ArrayList$ArrayListSpliterator (java.util)
> lambda$initPartialTraversalState$0:294, 
> StreamSpliterators$WrappingSpliterator (java.util.stream)
> getAsBoolean:-1, 1528195520 
> (java.util.stream.StreamSpliterators$WrappingSpliterator$$Lambda$57)
> fillBuffer:206, StreamSpliterators$AbstractWrappingSpliterator 
> (java.util.stream)
> doAdvance:161, StreamSpliterators$AbstractWrappingSpliterator 
> (java.util.stream)
> tryAdvance:300, StreamSpliterators$WrappingSpliterator (java.util.stream)
> hasNext:681, Spliterators$1Adapter (java.util)
> hasNext:83, MultiStateKeyIterator (org.apache.flink.state.api.input)
> hasNext:162, KeyedStateReaderOperator$NamespaceDecorator 
> (org.apache.flink.state.api.input.operator)
> reachedEnd:215, KeyedStateInputFormat (org.apache.flink.state.api.input)
> invoke:191, DataSourceTask (org.apache.flink.runtime.operators)
> doRun:776, Task (org.apache.flink.runtime.taskmanager)
> run:563, Task (org.apache.flink.runtime.taskmanager)
> run:748, Thread (java.lang)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33060) Fix the javadoc of ListState.update/addAll about not allowing null value

2023-11-07 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33060?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu updated FLINK-33060:
-
Fix Version/s: 1.19.0

> Fix the javadoc of ListState.update/addAll about not allowing null value
> 
>
> Key: FLINK-33060
> URL: https://issues.apache.org/jira/browse/FLINK-33060
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> After FLINK-8411, the ListState.update/add/addAll do not allow a null value 
> passed in, while the javadoc says "If null is passed in, the state value will 
> remain unchanged". This should be fixed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-33060) Fix the javadoc of ListState.update/addAll about not allowing null value

2023-11-07 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33060?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu resolved FLINK-33060.
--
Resolution: Fixed

merged e0240c78 into master.

> Fix the javadoc of ListState.update/addAll about not allowing null value
> 
>
> Key: FLINK-33060
> URL: https://issues.apache.org/jira/browse/FLINK-33060
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> After FLINK-8411, the ListState.update/add/addAll do not allow a null value 
> passed in, while the javadoc says "If null is passed in, the state value will 
> remain unchanged". This should be fixed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33060) Fix the javadoc of ListState.update/addAll about not allowing null value

2023-11-07 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33060?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu updated FLINK-33060:
-
Component/s: Runtime / State Backends

> Fix the javadoc of ListState.update/addAll about not allowing null value
> 
>
> Key: FLINK-33060
> URL: https://issues.apache.org/jira/browse/FLINK-33060
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Minor
>  Labels: pull-request-available
>
> After FLINK-8411, the ListState.update/add/addAll do not allow a null value 
> passed in, while the javadoc says "If null is passed in, the state value will 
> remain unchanged". This should be fixed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33437) Flink 1.17 sink commited legacy Committable state, but it was not removed from state backend

2023-11-06 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17783134#comment-17783134
 ] 

Hangxiang Yu commented on FLINK-33437:
--

[~dyccode] Thanks for reporting this.

Already assigned to you, please go ahead.

> Flink 1.17 sink commited legacy Committable state, but it was not removed 
> from state backend
> 
>
> Key: FLINK-33437
> URL: https://issues.apache.org/jira/browse/FLINK-33437
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka, Runtime / Checkpointing
>Affects Versions: 1.17.1
> Environment: K8s, Flink 1.17.1
>Reporter: Yuchi Duan
>Assignee: Yuchi Duan
>Priority: Minor
>
> My Flink job graph: kafka source -> process -> kafka sink.
> I used savepoint to upgrade Flink 1.14.5 to 1.17.1, and the program ran 
> normally.A month later, I restarted the Flink job using a savepoint, and the 
> job was started normally.Unfortunately, the Flink job failed every time when 
> it did a checkpoint.For example the following scenario:
>  
>  # The program uses Kafka sink
>  # Suspend flink job with savepoint A, and Flink Version is 1.14.x
>  # Recover the job with savepoint A, and Flink Version is 1.17.1
>  # Wait for time longer than {{transactional.id.expiration.ms}} + 
> {{transaction.remove.expired.transaction.cleanup.interval.ms}}
>  # Suspend flink job with savepoint B, and Flink Version is 1.17.1
>  # Recover the job with savepoint B, and Flink Version is 1.17.1
>  # Trigger checkpoint ,the Flink job will fail with the following error:
> {code:java}
> java.io.IOException: Could not perform checkpoint 1009710 for operator 
> kafka-sink: Committer (2/2)#2.
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1256)
>     at 
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
>     at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
>     at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
>     at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488)
>     at 
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
>     at 
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
>     at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
>     at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
>     at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
>     at 
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
>     at 
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:118)
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>     at java.lang.Thread.run(Thread.java:750)
> Caused by: 
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator
>     at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:96)
>     at 

[jira] [Assigned] (FLINK-33437) Flink 1.17 sink commited legacy Committable state, but it was not removed from state backend

2023-11-06 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu reassigned FLINK-33437:


Assignee: Yuchi Duan

> Flink 1.17 sink commited legacy Committable state, but it was not removed 
> from state backend
> 
>
> Key: FLINK-33437
> URL: https://issues.apache.org/jira/browse/FLINK-33437
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka, Runtime / Checkpointing
>Affects Versions: 1.17.1
> Environment: K8s, Flink 1.17.1
>Reporter: Yuchi Duan
>Assignee: Yuchi Duan
>Priority: Minor
>
> My Flink job graph: kafka source -> process -> kafka sink.
> I used savepoint to upgrade Flink 1.14.5 to 1.17.1, and the program ran 
> normally.A month later, I restarted the Flink job using a savepoint, and the 
> job was started normally.Unfortunately, the Flink job failed every time when 
> it did a checkpoint.For example the following scenario:
>  
>  # The program uses Kafka sink
>  # Suspend flink job with savepoint A, and Flink Version is 1.14.x
>  # Recover the job with savepoint A, and Flink Version is 1.17.1
>  # Wait for time longer than {{transactional.id.expiration.ms}} + 
> {{transaction.remove.expired.transaction.cleanup.interval.ms}}
>  # Suspend flink job with savepoint B, and Flink Version is 1.17.1
>  # Recover the job with savepoint B, and Flink Version is 1.17.1
>  # Trigger checkpoint ,the Flink job will fail with the following error:
> {code:java}
> java.io.IOException: Could not perform checkpoint 1009710 for operator 
> kafka-sink: Committer (2/2)#2.
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1256)
>     at 
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
>     at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
>     at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
>     at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488)
>     at 
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
>     at 
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
>     at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
>     at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
>     at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
>     at 
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
>     at 
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:118)
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>     at java.lang.Thread.run(Thread.java:750)
> Caused by: 
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator
>     at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:96)
>     at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:75)

[jira] [Updated] (FLINK-5865) Throw original exception in states

2023-11-05 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-5865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu updated FLINK-5865:

Fix Version/s: 1.19.0

> Throw original exception in states
> --
>
> Key: FLINK-5865
> URL: https://issues.apache.org/jira/browse/FLINK-5865
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.3.0
>Reporter: Xiaogang Shi
>Assignee: Zakelly Lan
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> auto-unassigned, pull-request-available
> Fix For: 1.19.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Now all exception thrown in RocksDB states are converted to 
> {{RuntimeException}}. It's unnecessary and will print useless stacks in the 
> log.
> I think it's better to throw the original exception, without any wrapping.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-5865) Throw original exception in states

2023-11-05 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-5865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu resolved FLINK-5865.
-
Resolution: Fixed

merged cee2611e into master.

About the [FLIP-368. 
|https://cwiki.apache.org/confluence/display/FLINK/FLIP-368%3A+Reorganize+the+exceptions+thrown+in+state+interfaces]Let's
 make it in a new ticket.

> Throw original exception in states
> --
>
> Key: FLINK-5865
> URL: https://issues.apache.org/jira/browse/FLINK-5865
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.3.0
>Reporter: Xiaogang Shi
>Assignee: Zakelly Lan
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> auto-unassigned, pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Now all exception thrown in RocksDB states are converted to 
> {{RuntimeException}}. It's unnecessary and will print useless stacks in the 
> log.
> I think it's better to throw the original exception, without any wrapping.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-5865) Throw original exception in states

2023-11-05 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-5865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu reassigned FLINK-5865:
---

Assignee: Zakelly Lan

> Throw original exception in states
> --
>
> Key: FLINK-5865
> URL: https://issues.apache.org/jira/browse/FLINK-5865
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.3.0
>Reporter: Xiaogang Shi
>Assignee: Zakelly Lan
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> auto-unassigned, pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Now all exception thrown in RocksDB states are converted to 
> {{RuntimeException}}. It's unnecessary and will print useless stacks in the 
> log.
> I think it's better to throw the original exception, without any wrapping.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.

2023-11-02 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17782014#comment-17782014
 ] 

Hangxiang Yu commented on FLINK-27681:
--

[~mayuehappy] Thanks for the reminder. I think it's doable. 
Just assigned to you. Please go ahead.

Just as we discussed: we could introduce a new configuration to enable this, 
and we could just verify the incrementall SST.

> Improve the availability of Flink when the RocksDB file is corrupted.
> -
>
> Key: FLINK-27681
> URL: https://issues.apache.org/jira/browse/FLINK-27681
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ming Li
>Priority: Critical
> Attachments: image-2023-08-23-15-06-16-717.png
>
>
> We have encountered several times when the RocksDB checksum does not match or 
> the block verification fails when the job is restored. The reason for this 
> situation is generally that there are some problems with the machine where 
> the task is located, which causes the files uploaded to HDFS to be incorrect, 
> but it has been a long time (a dozen minutes to half an hour) when we found 
> this problem. I'm not sure if anyone else has had a similar problem.
> Since this file is referenced by incremental checkpoints for a long time, 
> when the maximum number of checkpoints reserved is exceeded, we can only use 
> this file until it is no longer referenced. When the job failed, it cannot be 
> recovered.
> Therefore we consider:
> 1. Can RocksDB periodically check whether all files are correct and find the 
> problem in time?
> 2. Can Flink automatically roll back to the previous checkpoint when there is 
> a problem with the checkpoint data, because even with manual intervention, it 
> just tries to recover from the existing checkpoint or discard the entire 
> state.
> 3. Can we increase the maximum number of references to a file based on the 
> maximum number of checkpoints reserved? When the number of references exceeds 
> the maximum number of checkpoints -1, the Task side is required to upload a 
> new file for this reference. Not sure if this way will ensure that the new 
> file we upload will be correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.

2023-11-02 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu reassigned FLINK-27681:


Assignee: Yue Ma

> Improve the availability of Flink when the RocksDB file is corrupted.
> -
>
> Key: FLINK-27681
> URL: https://issues.apache.org/jira/browse/FLINK-27681
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ming Li
>Assignee: Yue Ma
>Priority: Critical
> Attachments: image-2023-08-23-15-06-16-717.png
>
>
> We have encountered several times when the RocksDB checksum does not match or 
> the block verification fails when the job is restored. The reason for this 
> situation is generally that there are some problems with the machine where 
> the task is located, which causes the files uploaded to HDFS to be incorrect, 
> but it has been a long time (a dozen minutes to half an hour) when we found 
> this problem. I'm not sure if anyone else has had a similar problem.
> Since this file is referenced by incremental checkpoints for a long time, 
> when the maximum number of checkpoints reserved is exceeded, we can only use 
> this file until it is no longer referenced. When the job failed, it cannot be 
> recovered.
> Therefore we consider:
> 1. Can RocksDB periodically check whether all files are correct and find the 
> problem in time?
> 2. Can Flink automatically roll back to the previous checkpoint when there is 
> a problem with the checkpoint data, because even with manual intervention, it 
> just tries to recover from the existing checkpoint or discard the entire 
> state.
> 3. Can we increase the maximum number of references to a file based on the 
> maximum number of checkpoints reserved? When the number of references exceeds 
> the maximum number of checkpoints -1, the Task side is required to upload a 
> new file for this reference. Not sure if this way will ensure that the new 
> file we upload will be correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-33122) [Benchmark] Null checkpoint directory in rescaling benchmarks

2023-10-20 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu resolved FLINK-33122.
--
Resolution: Fixed

> [Benchmark] Null checkpoint directory in rescaling benchmarks
> -
>
> Key: FLINK-33122
> URL: https://issues.apache.org/jira/browse/FLINK-33122
> Project: Flink
>  Issue Type: Bug
>  Components: Benchmarks
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> Currently, when setting up a rescaling benchmark, a local checkpoint storage 
> is created based on a local path configured by "benchmark.state.data-dir". 
> When user does not provide value for this option, an exception is thrown. In 
> this case, the right behavior should be to create a temporary directory for 
> checkpoint, just like the _StateBackendBenchmarkUtils#createKeyedStateBackend_
>  does for local data directory.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33122) [Benchmark] Null checkpoint directory in rescaling benchmarks

2023-10-20 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu updated FLINK-33122:
-
Fix Version/s: 1.19.0

> [Benchmark] Null checkpoint directory in rescaling benchmarks
> -
>
> Key: FLINK-33122
> URL: https://issues.apache.org/jira/browse/FLINK-33122
> Project: Flink
>  Issue Type: Bug
>  Components: Benchmarks
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> Currently, when setting up a rescaling benchmark, a local checkpoint storage 
> is created based on a local path configured by "benchmark.state.data-dir". 
> When user does not provide value for this option, an exception is thrown. In 
> this case, the right behavior should be to create a temporary directory for 
> checkpoint, just like the _StateBackendBenchmarkUtils#createKeyedStateBackend_
>  does for local data directory.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33122) [Benchmark] Null checkpoint directory in rescaling benchmarks

2023-10-20 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1663#comment-1663
 ] 

Hangxiang Yu commented on FLINK-33122:
--

merged into master via ddaafee5

> [Benchmark] Null checkpoint directory in rescaling benchmarks
> -
>
> Key: FLINK-33122
> URL: https://issues.apache.org/jira/browse/FLINK-33122
> Project: Flink
>  Issue Type: Bug
>  Components: Benchmarks
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
>  Labels: pull-request-available
>
> Currently, when setting up a rescaling benchmark, a local checkpoint storage 
> is created based on a local path configured by "benchmark.state.data-dir". 
> When user does not provide value for this option, an exception is thrown. In 
> this case, the right behavior should be to create a temporary directory for 
> checkpoint, just like the _StateBackendBenchmarkUtils#createKeyedStateBackend_
>  does for local data directory.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30863) Register local recovery files of changelog before notifyCheckpointComplete()

2023-10-20 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu updated FLINK-30863:
-
Fix Version/s: 1.19.0

> Register local recovery files of changelog before notifyCheckpointComplete()
> 
>
> Key: FLINK-30863
> URL: https://issues.apache.org/jira/browse/FLINK-30863
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.17.0
>Reporter: Yanfei Lei
>Assignee: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
> Attachments: tm-log_fail_cl_local_recovery.txt
>
>
> If TM is materialized before receiving confirm(), the previously uploaded 
> queue in `FsStateChangelogWriter` will be cleared, so the local files of the 
> completed checkpoint will not be registered again, while the JM owned files 
> are registered before confirm(), and do not depend on the uploaded queue, so 
> the local files are deleted, and the DFS files are still there. 
>  
> We have encountered the following situation, the job cannot find the local 
> recovery files, but can restore from the DFS files:
> {code:java}
> 2023-01-18 17:21:13,412 [SlidingProcessingTimeWindows (37/48)#1] INFO  
> org.apache.flink.runtime.taskmanager.Task                    [] - 
> SlidingProcessingTimeWindows (37/48)#1 #1 (fa12cfa3b811a351e031b036b0e85d91) 
> switched from DEPLOYING to INITIALIZING.
> 2023-01-18 17:21:13,440 [SlidingProcessingTimeWindows (37/48)#1] INFO  
> org.apache.flink.runtime.state.TaskLocalStateStoreImpl       [] - Found 
> registered local state for checkpoint 11599 in subtask 
> (2daf1d9bc9ed40ecb191303db813b0de - 0a448493b4782967b150582570326227 - 36) : 
> TaskOperatorSubtaskStates{subtaskStatesByOperatorID={0a448493b4782967b150582570326227=SubtaskState{operatorStateFromBackend=StateObjectCollection{[]},
>  operatorStateFromStream=StateObjectCollection{[]}, 
> keyedStateFromBackend=StateObjectCollection{[org.apache.flink.runtime.state.changelog.ChangelogStateBackendLocalHandle@38aa46db]},
>  keyedStateFromStream=StateObjectCollection{[]}, 
> inputChannelState=StateObjectCollection{[]}, 
> resultSubpartitionState=StateObjectCollection{[]}, stateSize=1764644202, 
> checkpointedSize=1997682}}, isTaskDeployedAsFinished=false, 
> isTaskFinished=false}
> 2023-01-18 17:21:13,442 [SlidingProcessingTimeWindows (37/48)#1] INFO  
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - 
> Getting managed memory shared cache for RocksDB.
> 2023-01-18 17:21:13,446 [SlidingProcessingTimeWindows (37/48)#1] INFO  
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - 
> Obtained shared RocksDB cache of size 1438814063 bytes
> 2023-01-18 17:21:13,447 [SlidingProcessingTimeWindows (37/48)#1] INFO  
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation
>  [] - Starting to restore from state handle: 
> IncrementalLocalKeyedStateHandle{metaDataState=File State: 
> file:/opt/flink/flink-tmp-dir/tm_job-2daf1d9b-c9ed-40ec-b191-303db813b0de-taskmanager-1-31/localState/aid_45af7e6b612dad10b60554d81323d5f3/jid_2daf1d9bc9ed40ecb191303db813b0de/vtx_0a448493b4782967b150582570326227_sti_36/chk_125/0d082666-bd31-4ebe-9977-545c0d9b18a5
>  [1187 bytes]} 
> DirectoryKeyedStateHandle{directoryStateHandle=DirectoryStateHandle{directory=/opt/flink/flink-tmp-dir/tm_job-2daf1d9b-c9ed-40ec-b191-303db813b0de-taskmanager-1-31/localState/aid_45af7e6b612dad10b60554d81323d5f3/jid_2daf1d9bc9ed40ecb191303db813b0de/vtx_0a448493b4782967b150582570326227_sti_36/chk_125/b3e1d20f164d4c5baed291f5d1224183},
>  keyGroupRange=KeyGroupRange{startKeyGroup=96, endKeyGroup=98}} without 
> rescaling.
> 2023-01-18 17:21:13,495 [SlidingProcessingTimeWindows (37/48)#1] INFO  
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation
>  [] - Finished restoring from state handle: 
> IncrementalLocalKeyedStateHandle{metaDataState=File State: 
> file:/opt/flink/flink-tmp-dir/tm_job-2daf1d9b-c9ed-40ec-b191-303db813b0de-taskmanager-1-31/localState/aid_45af7e6b612dad10b60554d81323d5f3/jid_2daf1d9bc9ed40ecb191303db813b0de/vtx_0a448493b4782967b150582570326227_sti_36/chk_125/0d082666-bd31-4ebe-9977-545c0d9b18a5
>  [1187 bytes]} 
> DirectoryKeyedStateHandle{directoryStateHandle=DirectoryStateHandle{directory=/opt/flink/flink-tmp-dir/tm_job-2daf1d9b-c9ed-40ec-b191-303db813b0de-taskmanager-1-31/localState/aid_45af7e6b612dad10b60554d81323d5f3/jid_2daf1d9bc9ed40ecb191303db813b0de/vtx_0a448493b4782967b150582570326227_sti_36/chk_125/b3e1d20f164d4c5baed291f5d1224183},
>  keyGroupRange=KeyGroupRange{startKeyGroup=96, endKeyGroup=98}} without 
> rescaling.
> 2023-01-18 17:21:13,495 

[jira] [Assigned] (FLINK-33156) Remove flakiness from tests in OperatorStateBackendTest.java

2023-09-28 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu reassigned FLINK-33156:


Assignee: Asha Boyapati

> Remove flakiness from tests in OperatorStateBackendTest.java
> 
>
> Key: FLINK-33156
> URL: https://issues.apache.org/jira/browse/FLINK-33156
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.17.1
>Reporter: Asha Boyapati
>Assignee: Asha Boyapati
>Priority: Minor
> Fix For: 1.17.1
>
>
> This issue is similar to:
> https://issues.apache.org/jira/browse/FLINK-32963
> We are proposing to make the following tests stable:
> {quote}org.apache.flink.runtime.state.OperatorStateBackendTest#testSnapshotRestoreSync
> org.apache.flink.runtime.state.OperatorStateBackendTest#testSnapshotRestoreAsync{quote}
> The tests are currently flaky because the order of elements returned by 
> iterators is non-deterministic.
> The following PR fixes the flaky test by making them independent of the order 
> of elements returned by the iterator:
> https://github.com/apache/flink/pull/23464
> We detected this using the NonDex tool using the following commands:
> {quote}mvn edu.illinois:nondex-maven-plugin:2.1.1:nondex -pl flink-runtime 
> -DnondexRuns=10 
> -Dtest=org.apache.flink.runtime.state.OperatorStateBackendTest#testSnapshotRestoreSync
> mvn edu.illinois:nondex-maven-plugin:2.1.1:nondex -pl flink-runtime 
> -DnondexRuns=10 
> -Dtest=org.apache.flink.runtime.state.OperatorStateBackendTest#testSnapshotRestoreAsync{quote}
> Please see the following Continuous Integration log that shows the flakiness:
> https://github.com/asha-boyapati/flink/actions/runs/6193757385
> Please see the following Continuous Integration log that shows that the 
> flakiness is fixed by this change:
> https://github.com/asha-boyapati/flink/actions/runs/619409



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-26586) FileSystem uses unbuffered read I/O

2023-09-28 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17769942#comment-17769942
 ] 

Hangxiang Yu commented on FLINK-26586:
--

{quote} * it can not be configured (enabling/buffer size){quote}
IMO, enabling this could be default for whom uses it, at least I haven't seen 
much cost currently.

But it's better if we could support to configure the buffer size.
{quote} * I replace the 'file:' plugin with my wrapped one, which for a general 
solution is not very elegant{quote}
I think it could be a general solution.
We could implement a buffered one, and then replace some necessary places with 
the new one.

I'm not clear about your 'wrapped one' means, so maybe just link your pr here 
if you think it could work now :)
{quote} * I simply allocate the buffer from heap instead of integrating Flink 
buffer management{quote}
It's okay for me at least just for this case (restore procedure), I think we 
could just use heap firstly.
{quote}my implementation is a façade to potentially all filesystem 
implementations, I think only the local filesystem implementation needs it, so 
we could also map to the Java buffered local I/O implementation instead of 
using java.io.FileInputStream
{quote}
Some filesystems such as hdfs, s3 should also has its inner buffer 
implementation IIUC.
I am not sure whether just mapping to Java buffered stream could meet the 
requirement. [~fanrui] Could you share something about this ? I saw you have 
implemented a new buffered one before.

 

> FileSystem uses unbuffered read I/O
> ---
>
> Key: FLINK-26586
> URL: https://issues.apache.org/jira/browse/FLINK-26586
> Project: Flink
>  Issue Type: Improvement
>  Components: API / State Processor, Connectors / FileSystem, Runtime 
> / Checkpointing
>Affects Versions: 1.13.0, 1.14.0
>Reporter: Matthias Schwalbe
>Priority: Major
> Attachments: BufferedFSDataInputStreamWrapper.java, 
> BufferedLocalFileSystem.java
>
>
> - I found out that, at least when using LocalFileSystem on a windows system, 
> read I/O to load a savepoint is unbuffered,
>  - See example stack [1]
>  - i.e. in order to load only a long in a serializer, it needs to go into 
> kernel mode 8 times and load the 8 bytes one by one
>  - I coded a BufferedFSDataInputStreamWrapper that allows to opt-in buffered 
> reads on any FileSystem implementation
>  - In our setting savepoint load is now 30 times faster
>  - I’ve once seen a Jira ticket as to improve savepoint load time in general 
> (lost the link unfortunately), maybe this approach can help with it
>  - not sure if HDFS has got the same problem
>  - I can contribute my implementation of a BufferedFSDataInputStreamWrapper 
> which can be integrated in any 
> [1] unbuffered reads stack:
> read:207, FileInputStream (java.io)
> read:68, LocalDataInputStream (org.apache.flink.core.fs.local)
> read:50, FSDataInputStreamWrapper (org.apache.flink.core.fs)
> read:42, ForwardingInputStream (org.apache.flink.runtime.util)
> readInt:390, DataInputStream (java.io)
> deserialize:80, BytePrimitiveArraySerializer 
> (org.apache.flink.api.common.typeutils.base.array)
> next:298, FullSnapshotRestoreOperation$KeyGroupEntriesIterator 
> (org.apache.flink.runtime.state.restore)
> next:273, FullSnapshotRestoreOperation$KeyGroupEntriesIterator 
> (org.apache.flink.runtime.state.restore)
> restoreKVStateData:147, RocksDBFullRestoreOperation 
> (org.apache.flink.contrib.streaming.state.restore)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33127) HeapKeyedStateBackend: use buffered I/O to speed up local recovery

2023-09-25 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17768610#comment-17768610
 ] 

Hangxiang Yu commented on FLINK-33127:
--

Actually I have taked a second review last month but not received your response 
until now.

Of course, I'm fine that we focused on FLINK-26585 firstly.

Just Kindly ping about the duplicated ticket.

> HeapKeyedStateBackend: use buffered I/O to speed up local recovery
> --
>
> Key: FLINK-33127
> URL: https://issues.apache.org/jira/browse/FLINK-33127
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Yangyang ZHANG
>Assignee: Yangyang ZHANG
>Priority: Major
> Attachments: thread_dump.png
>
>
> Recently, I observed a slow restore case in local recovery using hashmap 
> statebackend.
> It took 147 seconds to restore from a 467MB snapshot, 9 times slower than 
> that (16s) when restore from remote fs.
> The thread dump show that It read local snapshot file directly by unbuffered 
> FileInputStream / fs.local.LocalDataInputStream.
> !thread_dump.png!
> Maybe we can wrap with BufferInputStream to speed up local recovery.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-26585) State Processor API: Loading a state set buffers the whole state set in memory before starting to process

2023-09-25 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17768607#comment-17768607
 ] 

Hangxiang Yu commented on FLINK-26585:
--

[~Matthias Schwalbe] I have taked a second review last month but not received 
response from you.

You could check the newest comments from me last month.

> State Processor API: Loading a state set buffers the whole state set in 
> memory before starting to process
> -
>
> Key: FLINK-26585
> URL: https://issues.apache.org/jira/browse/FLINK-26585
> Project: Flink
>  Issue Type: Improvement
>  Components: API / State Processor
>Affects Versions: 1.13.0, 1.14.0, 1.15.0
>Reporter: Matthias Schwalbe
>Assignee: Matthias Schwalbe
>Priority: Major
>  Labels: pull-request-available
> Attachments: MultiStateKeyIteratorNoStreams.java
>
>
> * When loading a state, MultiStateKeyIterator load and bufferes the whole 
> state in memory before it event processes a single data point 
>  ** This is absolutely no problem for small state (hence the unit tests work 
> fine)
>  ** MultiStateKeyIterator ctor sets up a java Stream that iterates all state 
> descriptors and flattens all datapoints contained within
>  ** The java.util.stream.Stream#flatMap function causes the buffering of the 
> whole data set when enumerated later on
>  ** See call stack [1] 
>  *** I our case this is 150e6 data points (> 1GiB just for the pointers to 
> the data, let alone the data itself ~30GiB)
>  ** I’m not aware of some instrumentation of Stream in order to avoid the 
> problem, hence
>  ** I coded an alternative implementation of MultiStateKeyIterator that 
> avoids using java Stream,
>  ** I can contribute our implementation (MultiStateKeyIteratorNoStreams)
> [1]
> Streams call stack:
> hasNext:77, RocksStateKeysIterator 
> (org.apache.flink.contrib.streaming.state.iterator)
> next:82, RocksStateKeysIterator 
> (org.apache.flink.contrib.streaming.state.iterator)
> forEachRemaining:116, Iterator (java.util)
> forEachRemaining:1801, Spliterators$IteratorSpliterator (java.util)
> forEach:580, ReferencePipeline$Head (java.util.stream)
> accept:270, ReferencePipeline$7$1 (java.util.stream)  
>  #  Stream flatMap(final Function Stream> var1)
> accept:373, ReferencePipeline$11$1 (java.util.stream) 
>  # Stream peek(final Consumer var1)
> accept:193, ReferencePipeline$3$1 (java.util.stream)      
>  #  Stream map(final Function 
> var1)
> tryAdvance:1359, ArrayList$ArrayListSpliterator (java.util)
> lambda$initPartialTraversalState$0:294, 
> StreamSpliterators$WrappingSpliterator (java.util.stream)
> getAsBoolean:-1, 1528195520 
> (java.util.stream.StreamSpliterators$WrappingSpliterator$$Lambda$57)
> fillBuffer:206, StreamSpliterators$AbstractWrappingSpliterator 
> (java.util.stream)
> doAdvance:161, StreamSpliterators$AbstractWrappingSpliterator 
> (java.util.stream)
> tryAdvance:300, StreamSpliterators$WrappingSpliterator (java.util.stream)
> hasNext:681, Spliterators$1Adapter (java.util)
> hasNext:83, MultiStateKeyIterator (org.apache.flink.state.api.input)
> hasNext:162, KeyedStateReaderOperator$NamespaceDecorator 
> (org.apache.flink.state.api.input.operator)
> reachedEnd:215, KeyedStateInputFormat (org.apache.flink.state.api.input)
> invoke:191, DataSourceTask (org.apache.flink.runtime.operators)
> doRun:776, Task (org.apache.flink.runtime.taskmanager)
> run:563, Task (org.apache.flink.runtime.taskmanager)
> run:748, Thread (java.lang)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33127) HeapKeyedStateBackend: use buffered I/O to speed up local recovery

2023-09-24 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17768481#comment-17768481
 ] 

Hangxiang Yu commented on FLINK-33127:
--

IIUC, it's duplicated one with FLINK-26586 and FLINK-19911.

So just kindly ping, [~Matthias Schwalbe] Are you still working on FLINK-26586 ?

> HeapKeyedStateBackend: use buffered I/O to speed up local recovery
> --
>
> Key: FLINK-33127
> URL: https://issues.apache.org/jira/browse/FLINK-33127
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Yangyang ZHANG
>Assignee: Yangyang ZHANG
>Priority: Major
> Attachments: thread_dump.png
>
>
> Recently, I observed a slow restore case in local recovery using hashmap 
> statebackend.
> It took 147 seconds to restore from a 467MB snapshot, 9 times slower than 
> that (16s) when restore from remote fs.
> The thread dump show that It read local snapshot file directly by unbuffered 
> FileInputStream / fs.local.LocalDataInputStream.
> !thread_dump.png!
> Maybe we can wrap with BufferInputStream to speed up local recovery.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33090) CheckpointsCleaner clean individual checkpoint states in parallel

2023-09-17 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17766182#comment-17766182
 ] 

Hangxiang Yu commented on FLINK-33090:
--

Sure. Thanks for the pr.
I will take a look asap.

> CheckpointsCleaner clean individual checkpoint states in parallel
> -
>
> Key: FLINK-33090
> URL: https://issues.apache.org/jira/browse/FLINK-33090
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.17.1
>Reporter: Yi Zhang
>Assignee: Yi Zhang
>Priority: Major
>  Labels: pull-request-available
>
> Currently CheckpointsCleaner clean multiple checkpoints in parallel with 
> JobManager's ioExecutor, however each checkpoint states is cleaned 
> sequentially. With thousands of StateObjects to clean this can take long time 
> on some checkpoint storage, if longer than the checkpoint interval this 
> prevents new checkpointing.
> The proposal is to use the same ioExecutor to clean up each checkpoints 
> states in parallel as well. From my local testing, with default settings for 
> ioExecutor thread pool for xK state files this can reduce clean up time from 
> 10 minutes to <1 minute. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-33090) CheckpointsCleaner clean individual checkpoint states in parallel

2023-09-15 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu reassigned FLINK-33090:


Assignee: Yi Zhang

> CheckpointsCleaner clean individual checkpoint states in parallel
> -
>
> Key: FLINK-33090
> URL: https://issues.apache.org/jira/browse/FLINK-33090
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.17.1
>Reporter: Yi Zhang
>Assignee: Yi Zhang
>Priority: Major
>
> Currently CheckpointsCleaner clean multiple checkpoints in parallel with 
> JobManager's ioExecutor, however each checkpoint states is cleaned 
> sequentially. With thousands of StateObjects to clean this can take long time 
> on some checkpoint storage, if longer than the checkpoint interval this 
> prevents new checkpointing.
> The proposal is to use the same ioExecutor to clean up each checkpoints 
> states in parallel as well. From my local testing, with default settings for 
> ioExecutor thread pool for xK state files this can reduce clean up time from 
> 10 minutes to <1 minute. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33090) CheckpointsCleaner clean individual checkpoint states in parallel

2023-09-14 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17765423#comment-17765423
 ] 

Hangxiang Yu commented on FLINK-33090:
--

Thanks for the proposal.

We also saw related problem in FLINK-26590 which I just linked.

Would you like to contribute your codes ?

> CheckpointsCleaner clean individual checkpoint states in parallel
> -
>
> Key: FLINK-33090
> URL: https://issues.apache.org/jira/browse/FLINK-33090
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.17.1
>Reporter: Yi Zhang
>Priority: Major
>
> Currently CheckpointsCleaner clean multiple checkpoints in parallel with 
> JobManager's ioExecutor, however each checkpoint states is cleaned 
> sequentially. With thousands of StateObjects to clean this can take long time 
> on some checkpoint storage, if longer than the checkpoint interval this 
> prevents new checkpointing.
> The proposal is to use the same ioExecutor to clean up each checkpoints 
> states in parallel as well. From my local testing, with default settings for 
> ioExecutor thread pool for xK state files this can reduce clean up time from 
> 10 minutes to <1 minute. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32953) [State TTL]resolve data correctness problem after ttl was changed

2023-09-14 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32953?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu updated FLINK-32953:
-
Fix Version/s: 1.19.0

> [State TTL]resolve data correctness problem after ttl was changed 
> --
>
> Key: FLINK-32953
> URL: https://issues.apache.org/jira/browse/FLINK-32953
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Jinzhong Li
>Assignee: Jinzhong Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> Because expired data is cleaned up in background on a best effort basis 
> (hashmap use INCREMENTAL_CLEANUP strategy, rocksdb use 
> ROCKSDB_COMPACTION_FILTER strategy), some expired state is often persisted 
> into snapshots.
>  
> In some scenarios, user changes the state ttl of the job and then restore job 
> from the old state. If the user adjust the state ttl from a short value to a 
> long value (eg, from 12 hours to 24 hours),  some expired data that was not 
> cleaned up will be alive after restore. Obviously this is unreasonable, and 
> may break data regulatory requirements. 
>  
> Particularly, rocksdb stateBackend may cause data correctness problems due to 
> level compaction in this case.(eg. One key has two versions at level-1 and 
> level-2,both of which are ttl expired. Then level-1 version is cleaned up by 
> compaction,  and level-2 version isn't.  If we adjust state ttl and restart 
> job, the incorrect data of level-2 will become valid after restore)
>  
> To solve this problem, I think we can
> 1) persist old state ttl into snapshot meta info; (eg. 
> RegisteredKeyValueStateBackendMetaInfo or others)
> 2) During state restore, check the size between the current ttl and old ttl;
> 3) If current ttl is longer than old ttl, we need to iterate over all data, 
> filter out expired data with old ttl, and wirte valid data into stateBackend.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32953) [State TTL]resolve data correctness problem after ttl was changed

2023-09-14 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17765422#comment-17765422
 ] 

Hangxiang Yu commented on FLINK-32953:
--

merged 9a891f117c3a44a633316b84f9cbf2a541b80d11 into master

> [State TTL]resolve data correctness problem after ttl was changed 
> --
>
> Key: FLINK-32953
> URL: https://issues.apache.org/jira/browse/FLINK-32953
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Jinzhong Li
>Assignee: Jinzhong Li
>Priority: Major
>  Labels: pull-request-available
>
> Because expired data is cleaned up in background on a best effort basis 
> (hashmap use INCREMENTAL_CLEANUP strategy, rocksdb use 
> ROCKSDB_COMPACTION_FILTER strategy), some expired state is often persisted 
> into snapshots.
>  
> In some scenarios, user changes the state ttl of the job and then restore job 
> from the old state. If the user adjust the state ttl from a short value to a 
> long value (eg, from 12 hours to 24 hours),  some expired data that was not 
> cleaned up will be alive after restore. Obviously this is unreasonable, and 
> may break data regulatory requirements. 
>  
> Particularly, rocksdb stateBackend may cause data correctness problems due to 
> level compaction in this case.(eg. One key has two versions at level-1 and 
> level-2,both of which are ttl expired. Then level-1 version is cleaned up by 
> compaction,  and level-2 version isn't.  If we adjust state ttl and restart 
> job, the incorrect data of level-2 will become valid after restore)
>  
> To solve this problem, I think we can
> 1) persist old state ttl into snapshot meta info; (eg. 
> RegisteredKeyValueStateBackendMetaInfo or others)
> 2) During state restore, check the size between the current ttl and old ttl;
> 3) If current ttl is longer than old ttl, we need to iterate over all data, 
> filter out expired data with old ttl, and wirte valid data into stateBackend.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-14032) Make the cache size of RocksDBPriorityQueueSetFactory configurable

2023-09-13 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu resolved FLINK-14032.
--
Fix Version/s: 1.19.0
   (was: 1.18.0)
   Resolution: Fixed

merged 3671960ec519d3eea6a78e498d61c860db1a2d6b into master

> Make the cache size of RocksDBPriorityQueueSetFactory configurable
> --
>
> Key: FLINK-14032
> URL: https://issues.apache.org/jira/browse/FLINK-14032
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Jinzhong Li
>Priority: Major
>  Labels: auto-unassigned, pull-request-available, usability
> Fix For: 1.19.0
>
>
> Currently, the cache size of {{RocksDBPriorityQueueSetFactory}} has been set 
> as 128 and no any ways to configure this to other value. (We could increase 
> this to obtain better performance if necessary). Actually, this is also a 
> TODO for quiet a long time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-33034) Incorrect StateBackendTestBase#testGetKeysAndNamespaces

2023-09-12 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu resolved FLINK-33034.
--
Fix Version/s: 1.19.0
   Resolution: Fixed

merged 36b9da50da9405b5b79f0d4da9393921982ab040 into master

> Incorrect StateBackendTestBase#testGetKeysAndNamespaces
> ---
>
> Key: FLINK-33034
> URL: https://issues.apache.org/jira/browse/FLINK-33034
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.12.2, 1.15.0, 1.17.1
>Reporter: Dmitriy Linevich
>Assignee: Dmitriy Linevich
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.19.0
>
> Attachments: image-2023-09-05-12-51-28-203.png
>
>
> In this test first namespace 'ns1' doesn't exist in state, because creating 
> ValueState is incorrect for test (When creating the 2nd value state namespace 
> 'ns1' is overwritten by namespace 'ns2'). Need to fix it, to change creating 
> ValueState or to change process of updating this state.
>  
> If to add following code for checking count of adding namespaces to state 
> [here|https://github.com/apache/flink/blob/3e6a1aab0712acec3e9fcc955a28f2598f019377/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java#L501C28-L501C28]
> {code:java}
> assertThat(keysByNamespace.size(), is(2)); {code}
> then
> !image-2023-09-05-12-51-28-203.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-23411) Expose Flink checkpoint details metrics

2023-09-12 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu updated FLINK-23411:
-
Component/s: Runtime / Checkpointing

> Expose Flink checkpoint details metrics
> ---
>
> Key: FLINK-23411
> URL: https://issues.apache.org/jira/browse/FLINK-23411
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / Metrics
>Affects Versions: 1.13.1, 1.12.4
>Reporter: Jun Qin
>Assignee: Hangxiang Yu
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.18.0
>
>
> The checkpoint metrics as shown in the Flink Web UI like the 
> sync/async/alignment/start delay are not exposed to the metrics system. This 
> makes problem investigation harder when Web UI is not enabled: those numbers 
> can not get in the DEBUG logs. I think we should see how we can expose 
> metrics.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-23411) Expose Flink checkpoint details metrics

2023-09-12 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17764477#comment-17764477
 ] 

Hangxiang Yu commented on FLINK-23411:
--

Thanks for the quick reply.
{quote}Anyway, I think best way would be to first think through OTEL/Traces 
integration, add support for that, and then add those new checkpointing metrics 
from this ticket in this new model. However it's indeed much more work 
(including writing and voting on a FLIP)
{quote}
Yeah, Supprting OTEL/Traces integration makes sense.

Besides some task-level metrics like this, users could also report their 
own-defined operator metrics to their own distributed tracing system which may 
be traced together with other jobs or systems.

I could also try to improve this when I'm free if you also think it's helpful 
for users.
{quote}One thing is that in order to not bloat metric system too much, we 
should implement this as an opt-in feature, hidden behind a feature toggle, 
that users would have to manually enable in order to see those metrics.
{quote}
Sure, I aggree. Thanks for the advice.

 

> Expose Flink checkpoint details metrics
> ---
>
> Key: FLINK-23411
> URL: https://issues.apache.org/jira/browse/FLINK-23411
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Affects Versions: 1.13.1, 1.12.4
>Reporter: Jun Qin
>Assignee: Hangxiang Yu
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.18.0
>
>
> The checkpoint metrics as shown in the Flink Web UI like the 
> sync/async/alignment/start delay are not exposed to the metrics system. This 
> makes problem investigation harder when Web UI is not enabled: those numbers 
> can not get in the DEBUG logs. I think we should see how we can expose 
> metrics.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32953) [State TTL]resolve data correctness problem after ttl was changed

2023-09-12 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17764041#comment-17764041
 ] 

Hangxiang Yu commented on FLINK-32953:
--

[~lijinzhong] Thanks for the effort.

As disscussed offline, I think it's fine we resolve this by adding alert infos 
in the documentation firstly.

We could continute to dive into this problem when we have more feedback from 
users or other developers.

 

> [State TTL]resolve data correctness problem after ttl was changed 
> --
>
> Key: FLINK-32953
> URL: https://issues.apache.org/jira/browse/FLINK-32953
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Jinzhong Li
>Assignee: Jinzhong Li
>Priority: Major
>
> Because expired data is cleaned up in background on a best effort basis 
> (hashmap use INCREMENTAL_CLEANUP strategy, rocksdb use 
> ROCKSDB_COMPACTION_FILTER strategy), some expired state is often persisted 
> into snapshots.
>  
> In some scenarios, user changes the state ttl of the job and then restore job 
> from the old state. If the user adjust the state ttl from a short value to a 
> long value (eg, from 12 hours to 24 hours),  some expired data that was not 
> cleaned up will be alive after restore. Obviously this is unreasonable, and 
> may break data regulatory requirements. 
>  
> Particularly, rocksdb stateBackend may cause data correctness problems due to 
> level compaction in this case.(eg. One key has two versions at level-1 and 
> level-2,both of which are ttl expired. Then level-1 version is cleaned up by 
> compaction,  and level-2 version isn't.  If we adjust state ttl and restart 
> job, the incorrect data of level-2 will become valid after restore)
>  
> To solve this problem, I think we can
> 1) persist old state ttl into snapshot meta info; (eg. 
> RegisteredKeyValueStateBackendMetaInfo or others)
> 2) During state restore, check the size between the current ttl and old ttl;
> 3) If current ttl is longer than old ttl, we need to iterate over all data, 
> filter out expired data with old ttl, and wirte valid data into stateBackend.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-23411) Expose Flink checkpoint details metrics

2023-09-12 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17764038#comment-17764038
 ] 

Hangxiang Yu commented on FLINK-23411:
--

Hi, [~pnowojski] , Thanks for picking this up.
I think it's indeed a problem that all task level metrics have, and 
checkpoint-related metrics makes it more obvious which is related to checkpoint 
duration.

[distributed 
tracing|https://newrelic.com/blog/how-to-relic/distributed-tracing-anomaly-detection]
 and OTEL sound an intersting idea, maybe we could still register some task 
level metrics like this which could be unregistered, and it could work with 
OTEL.

It's fine for me to resolve FLINK-33071 firstly.

> Expose Flink checkpoint details metrics
> ---
>
> Key: FLINK-23411
> URL: https://issues.apache.org/jira/browse/FLINK-23411
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Affects Versions: 1.13.1, 1.12.4
>Reporter: Jun Qin
>Assignee: Hangxiang Yu
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.18.0
>
>
> The checkpoint metrics as shown in the Flink Web UI like the 
> sync/async/alignment/start delay are not exposed to the metrics system. This 
> makes problem investigation harder when Web UI is not enabled: those numbers 
> can not get in the DEBUG logs. I think we should see how we can expose 
> metrics.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-33055) Correct the error value about 'state.backend.type' in the document

2023-09-08 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu resolved FLINK-33055.
--
Fix Version/s: 1.19.0
   Resolution: Fixed

merged 53ccb93cfb3b6f2befdb13cdeb9d96acbef44932 into master

> Correct the error value about 'state.backend.type' in the document
> --
>
> Key: FLINK-33055
> URL: https://issues.apache.org/jira/browse/FLINK-33055
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Runtime / State Backends
>Reporter: Hangxiang Yu
>Assignee: Jinzhong Li
>Priority: Minor
> Fix For: 1.19.0
>
>
>  
> {code:java}
> state.backend.type: The state backend to use. This defines the data structure 
> mechanism for taking snapshots. Common values are filesystem or rocksdb{code}
> filesystem should be replaced with hashmap after FLINK-16444.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-30217) Use ListState#update() to replace clear + add mode.

2023-09-07 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu resolved FLINK-30217.
--
Fix Version/s: 1.19.0
   Resolution: Fixed

merged 20c0acf60d137cd613914503f1b40e7b2adb86c1 into master

> Use ListState#update() to replace clear + add mode.
> ---
>
> Key: FLINK-30217
> URL: https://issues.apache.org/jira/browse/FLINK-30217
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: xljtswf
>Assignee: Zakelly Lan
>Priority: Major
> Fix For: 1.19.0
>
>
> When using listState, I found many times we need to clear current state, then 
> add new values. This is especially common in 
> CheckpointedFunction#snapshotState, which is slower than just use 
> ListState#update().
> Suppose we want to update the liststate to contain value1, value2, value3.
> With current implementation, we first call Liststate#clear(). this updates 
> the state 1 time.
> then we add value1, value2, value3 to the state.
> if we use heap state, we need to search the stateTable 3 times and add 3 
> values to the list.
> this happens in memory and is not too bad.
> if we use rocksdb. then we will call backend.db.merge() 3 times.
> finally, we will  update the state 4 times.
> The more values to be added, the more times we will update the state.
> while if we use listState#update. then we just need to update the state 1 
> time. I think this can save a lot of time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33055) Correct the error value about 'state.backend.type' in the document

2023-09-07 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17762950#comment-17762950
 ] 

Hangxiang Yu commented on FLINK-33055:
--

[~lijinzhong] Thanks for volunteering, already assigned to you, please go ahead.

> Correct the error value about 'state.backend.type' in the document
> --
>
> Key: FLINK-33055
> URL: https://issues.apache.org/jira/browse/FLINK-33055
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Runtime / State Backends
>Reporter: Hangxiang Yu
>Assignee: Jinzhong Li
>Priority: Minor
>
>  
> {code:java}
> state.backend.type: The state backend to use. This defines the data structure 
> mechanism for taking snapshots. Common values are filesystem or rocksdb{code}
> filesystem should be replaced with hashmap after FLINK-16444.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-33055) Correct the error value about 'state.backend.type' in the document

2023-09-07 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu reassigned FLINK-33055:


Assignee: Jinzhong Li

> Correct the error value about 'state.backend.type' in the document
> --
>
> Key: FLINK-33055
> URL: https://issues.apache.org/jira/browse/FLINK-33055
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Runtime / State Backends
>Reporter: Hangxiang Yu
>Assignee: Jinzhong Li
>Priority: Minor
>
>  
> {code:java}
> state.backend.type: The state backend to use. This defines the data structure 
> mechanism for taking snapshots. Common values are filesystem or rocksdb{code}
> filesystem should be replaced with hashmap after FLINK-16444.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-15922) Show "Warn - received late message for checkpoint" only when checkpoint actually expired

2023-09-07 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu resolved FLINK-15922.
--
Fix Version/s: 1.19.0
   Resolution: Fixed

merged 80a8250176fb2b1d9809fcd47df6097fd53999f6 into master

> Show "Warn - received late message for checkpoint" only when checkpoint 
> actually expired
> 
>
> Key: FLINK-15922
> URL: https://issues.apache.org/jira/browse/FLINK-15922
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.10.0
>Reporter: Stephan Ewen
>Assignee: Zakelly Lan
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, beginner, beginner-friendly, 
> usability
> Fix For: 1.19.0
>
>
> The message "Warn - received late message for checkpoint" is shown frequently 
> in the logs, also when a checkpoint was purposefully canceled.
> In those case, this message is unhelpful and misleading.
> We should log this only when the checkpoint is actually expired.
> Meaning that when receiving the message, we check if we have an expired 
> checkpoint for that ID. If yes, we log that message, if not, we simply drop 
> the message.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33057) Add options to disable creating job-id subdirectories under the checkpoint directory

2023-09-07 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17762627#comment-17762627
 ] 

Hangxiang Yu commented on FLINK-33057:
--

Thanks for picking this up.

+1 for introducing such an option.

> Add options to disable creating job-id subdirectories under the checkpoint 
> directory
> 
>
> Key: FLINK-33057
> URL: https://issues.apache.org/jira/browse/FLINK-33057
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.19.0
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
>
> By default, Flink creates subdirectories named by UUID (job id) under 
> checkpoint directory for each job. It's a good means to avoid collision. 
> However, it also bring in some effort to remember/find the right directory 
> when recovering from previous checkpoint. According to previous discussion 
> ([Yun 
> Tang's|https://issues.apache.org/jira/browse/FLINK-11789?focusedCommentId=16782314=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16782314]
>  and [Stephan 
> Ewen's|https://issues.apache.org/jira/browse/FLINK-9043?focusedCommentId=16409254=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16409254]
>  ), I think it would be useful to add an option to disable creating the UUID 
> subdirectories under the checkpoint directory. For compatibility 
> considerations, we create the subdirectories by default.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-9631) use Files.createDirectories instead of directory.mkdirs

2023-09-07 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-9631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17762601#comment-17762601
 ] 

Hangxiang Yu edited comment on FLINK-9631 at 9/7/23 7:51 AM:
-

[~zhangyy91] Thanks for picking this up.

Already assigned to you, please go ahead.


was (Author: masteryhx):
Thanks for picking this up.

Already assigned to you, please go ahead.

> use Files.createDirectories instead of directory.mkdirs
> ---
>
> Key: FLINK-9631
> URL: https://issues.apache.org/jira/browse/FLINK-9631
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.4.2, 1.5.0
> Environment: flink1.4
> jdk1.8 latest
> linux 2.6
>Reporter: makeyang
>Assignee: Yangyang ZHANG
>Priority: Not a Priority
>  Labels: auto-deprioritized-minor, auto-unassigned
>
> job can't be run due to below exception:
> {color:#6a8759}Could not create RocksDB data directory{color}
>  but with this exception, I can't tell exactly why.
> so I suggest Files.createDirectories which throw exception be used rather 
> than File.mkdirs
>  
> I have some more suggestions:
>  # should we use Files.createDirectories to relpace File.mkdirs?
>  # each time task manager throw exception to jobmanager, should IP+nodeId be 
> contained in exception, which means we should define more flink exception 
> which is used to wrap other exceptions such as jdk exceptions?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-9631) use Files.createDirectories instead of directory.mkdirs

2023-09-07 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-9631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu reassigned FLINK-9631:
---

Assignee: Yangyang ZHANG

> use Files.createDirectories instead of directory.mkdirs
> ---
>
> Key: FLINK-9631
> URL: https://issues.apache.org/jira/browse/FLINK-9631
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.4.2, 1.5.0
> Environment: flink1.4
> jdk1.8 latest
> linux 2.6
>Reporter: makeyang
>Assignee: Yangyang ZHANG
>Priority: Not a Priority
>  Labels: auto-deprioritized-minor, auto-unassigned
>
> job can't be run due to below exception:
> {color:#6a8759}Could not create RocksDB data directory{color}
>  but with this exception, I can't tell exactly why.
> so I suggest Files.createDirectories which throw exception be used rather 
> than File.mkdirs
>  
> I have some more suggestions:
>  # should we use Files.createDirectories to relpace File.mkdirs?
>  # each time task manager throw exception to jobmanager, should IP+nodeId be 
> contained in exception, which means we should define more flink exception 
> which is used to wrap other exceptions such as jdk exceptions?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-9631) use Files.createDirectories instead of directory.mkdirs

2023-09-07 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-9631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17762601#comment-17762601
 ] 

Hangxiang Yu commented on FLINK-9631:
-

Thanks for picking this up.

Already assigned to you, please go ahead.

> use Files.createDirectories instead of directory.mkdirs
> ---
>
> Key: FLINK-9631
> URL: https://issues.apache.org/jira/browse/FLINK-9631
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.4.2, 1.5.0
> Environment: flink1.4
> jdk1.8 latest
> linux 2.6
>Reporter: makeyang
>Priority: Not a Priority
>  Labels: auto-deprioritized-minor, auto-unassigned
>
> job can't be run due to below exception:
> {color:#6a8759}Could not create RocksDB data directory{color}
>  but with this exception, I can't tell exactly why.
> so I suggest Files.createDirectories which throw exception be used rather 
> than File.mkdirs
>  
> I have some more suggestions:
>  # should we use Files.createDirectories to relpace File.mkdirs?
>  # each time task manager throw exception to jobmanager, should IP+nodeId be 
> contained in exception, which means we should define more flink exception 
> which is used to wrap other exceptions such as jdk exceptions?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31685) Checkpoint job folder not deleted after job is cancelled

2023-09-07 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17762597#comment-17762597
 ] 

Hangxiang Yu commented on FLINK-31685:
--

[~Zakelly] 
This makes sense to me.
_FsCompletedCheckpointStorageLocation_ losts global view of checkpoint dir info.
+1 for deleting the directory only when we know all checkpoint files are 
deleted.

> Checkpoint job folder not deleted after job is cancelled
> 
>
> Key: FLINK-31685
> URL: https://issues.apache.org/jira/browse/FLINK-31685
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.16.1
>Reporter: Sergio Sainz
>Priority: Major
>
> When flink job is being checkpointed, and after the job is cancelled, the 
> checkpoint is indeed deleted (as per 
> {{{}execution.checkpointing.externalized-checkpoint-retention: 
> DELETE_ON_CANCELLATION{}}}), but the job-id folder still remains:
>  
> [sergio@flink-cluster-54f7fc7c6-k6km8 JobCheckpoints]$ ls
> 01eff17aa2910484b5aeb644bc531172  3a59309ef018541fc0c20856d0d89855  
> 78ff2344dd7ef89f9fbcc9789fc0cd79  a6fd7cec89c0af78c3353d4a46a7d273  
> dbc957868c08ebeb100d708bbd057593
> 04ff0abb9e860fc85f0e39d722367c3c  3e09166341615b1b4786efd6745a05d6  
> 79efc000aa29522f0a9598661f485f67  a8c42bfe158abd78ebcb4adb135de61f  
> dc8e04b02c9d8a1bc04b21d2c8f21f74
> 05f48019475de40230900230c63cfe89  3f9fb467c9af91ef41d527fe92f9b590  
> 7a6ad7407d7120eda635d71cd843916a  a8db748c1d329407405387ac82040be4  
> dfb2df1c25056e920d41c94b659dcdab
> 09d30bc0ff786994a6a3bb06abd3  455525b76a1c6826b6eaebd5649c5b6b  
> 7b1458424496baaf3d020e9fece525a4  aa2ef9587b2e9c123744e8940a66a287
> All folders in the above list, like {{01eff17aa2910484b5aeb644bc531172}}  , 
> are empty ~
>  
> *Expected behaviour:*
> The job folder id should also be deleted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33055) Correct the error value about 'state.backend.type' in the document

2023-09-07 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu updated FLINK-33055:
-
Description: 
 
{code:java}
state.backend.type: The state backend to use. This defines the data structure 
mechanism for taking snapshots. Common values are filesystem or rocksdb{code}
filesystem should be replaced with hashmap after FLINK-16444.

  was:
{{}}
{code:java}
state.backend.type: The state backend to use. This defines the data structure 
mechanism for taking snapshots. Common values are filesystem or rocksdb{code}
filesystem should be replaced with hashmap after FLINK-16444.

{{}}


> Correct the error value about 'state.backend.type' in the document
> --
>
> Key: FLINK-33055
> URL: https://issues.apache.org/jira/browse/FLINK-33055
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Runtime / State Backends
>Reporter: Hangxiang Yu
>Priority: Minor
>
>  
> {code:java}
> state.backend.type: The state backend to use. This defines the data structure 
> mechanism for taking snapshots. Common values are filesystem or rocksdb{code}
> filesystem should be replaced with hashmap after FLINK-16444.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33055) Correct the error value about 'state.backend.type' in the document

2023-09-07 Thread Hangxiang Yu (Jira)
Hangxiang Yu created FLINK-33055:


 Summary: Correct the error value about 'state.backend.type' in the 
document
 Key: FLINK-33055
 URL: https://issues.apache.org/jira/browse/FLINK-33055
 Project: Flink
  Issue Type: Bug
  Components: Documentation, Runtime / State Backends
Reporter: Hangxiang Yu


{{}}
{code:java}
state.backend.type: The state backend to use. This defines the data structure 
mechanism for taking snapshots. Common values are filesystem or rocksdb{code}
filesystem should be replaced with hashmap after FLINK-16444.

{{}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-15922) Show "Warn - received late message for checkpoint" only when checkpoint actually expired

2023-09-06 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu reassigned FLINK-15922:


Assignee: Zakelly Lan

> Show "Warn - received late message for checkpoint" only when checkpoint 
> actually expired
> 
>
> Key: FLINK-15922
> URL: https://issues.apache.org/jira/browse/FLINK-15922
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.10.0
>Reporter: Stephan Ewen
>Assignee: Zakelly Lan
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, beginner, beginner-friendly, 
> usability
>
> The message "Warn - received late message for checkpoint" is shown frequently 
> in the logs, also when a checkpoint was purposefully canceled.
> In those case, this message is unhelpful and misleading.
> We should log this only when the checkpoint is actually expired.
> Meaning that when receiving the message, we check if we have an expired 
> checkpoint for that ID. If yes, we log that message, if not, we simply drop 
> the message.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-5279) Improve error message when trying to access keyed state in non-keyed operator

2023-09-06 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-5279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu resolved FLINK-5279.
-
Fix Version/s: 1.19.0
   Resolution: Fixed

merged 7b6243bb0ba55aafad1ca5a17bc457d229763433 into master

> Improve error message when trying to access keyed state in non-keyed operator
> -
>
> Key: FLINK-5279
> URL: https://issues.apache.org/jira/browse/FLINK-5279
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.1.3
>Reporter: Ufuk Celebi
>Assignee: Zakelly Lan
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
> Fix For: 1.19.0
>
>
> When trying to access keyed state in a non-keyed operator, the error message 
> is not very helpful. You get a trace like this:
> {code}
> java.lang.RuntimeException: Error while getting state
> ...
> Caused by: java.lang.RuntimeException: State key serializer has not been 
> configured in the config. This operation cannot use partitioned state.
> {code}
> It will be helpful to users if this is more explicit to users, stating that 
> the API can only be used on keyed streams, etc.
> If this applies to the current master as well, we should fix it there, too.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32523) NotifyCheckpointAbortedITCase.testNotifyCheckpointAborted fails with timeout on AZP

2023-08-31 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu updated FLINK-32523:
-
Fix Version/s: 1.16.3
   1.17.2

> NotifyCheckpointAbortedITCase.testNotifyCheckpointAborted fails with timeout 
> on AZP
> ---
>
> Key: FLINK-32523
> URL: https://issues.apache.org/jira/browse/FLINK-32523
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.16.2, 1.18.0, 1.17.1
>Reporter: Sergey Nuyanzin
>Assignee: Hangxiang Yu
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.18.0, 1.16.3, 1.17.2
>
> Attachments: failure.log
>
>
> This build
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50795=logs=39d5b1d5-3b41-54dc-6458-1e2ddd1cdcf3=0c010d0c-3dec-5bf1-d408-7b18988b1b2b=8638
>  fails with timeout
> {noformat}
> Jul 03 01:26:35 org.junit.runners.model.TestTimedOutException: test timed out 
> after 10 milliseconds
> Jul 03 01:26:35   at java.lang.Object.wait(Native Method)
> Jul 03 01:26:35   at java.lang.Object.wait(Object.java:502)
> Jul 03 01:26:35   at 
> org.apache.flink.core.testutils.OneShotLatch.await(OneShotLatch.java:61)
> Jul 03 01:26:35   at 
> org.apache.flink.test.checkpointing.NotifyCheckpointAbortedITCase.verifyAllOperatorsNotifyAborted(NotifyCheckpointAbortedITCase.java:198)
> Jul 03 01:26:35   at 
> org.apache.flink.test.checkpointing.NotifyCheckpointAbortedITCase.testNotifyCheckpointAborted(NotifyCheckpointAbortedITCase.java:189)
> Jul 03 01:26:35   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Jul 03 01:26:35   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Jul 03 01:26:35   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Jul 03 01:26:35   at java.lang.reflect.Method.invoke(Method.java:498)
> Jul 03 01:26:35   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> Jul 03 01:26:35   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Jul 03 01:26:35   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> Jul 03 01:26:35   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Jul 03 01:26:35   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
> Jul 03 01:26:35   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
> Jul 03 01:26:35   at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> Jul 03 01:26:35   at java.lang.Thread.run(Thread.java:748)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32523) NotifyCheckpointAbortedITCase.testNotifyCheckpointAborted fails with timeout on AZP

2023-08-31 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu updated FLINK-32523:
-
Affects Version/s: 1.17.1

> NotifyCheckpointAbortedITCase.testNotifyCheckpointAborted fails with timeout 
> on AZP
> ---
>
> Key: FLINK-32523
> URL: https://issues.apache.org/jira/browse/FLINK-32523
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.16.2, 1.18.0, 1.17.1
>Reporter: Sergey Nuyanzin
>Assignee: Hangxiang Yu
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.18.0
>
> Attachments: failure.log
>
>
> This build
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50795=logs=39d5b1d5-3b41-54dc-6458-1e2ddd1cdcf3=0c010d0c-3dec-5bf1-d408-7b18988b1b2b=8638
>  fails with timeout
> {noformat}
> Jul 03 01:26:35 org.junit.runners.model.TestTimedOutException: test timed out 
> after 10 milliseconds
> Jul 03 01:26:35   at java.lang.Object.wait(Native Method)
> Jul 03 01:26:35   at java.lang.Object.wait(Object.java:502)
> Jul 03 01:26:35   at 
> org.apache.flink.core.testutils.OneShotLatch.await(OneShotLatch.java:61)
> Jul 03 01:26:35   at 
> org.apache.flink.test.checkpointing.NotifyCheckpointAbortedITCase.verifyAllOperatorsNotifyAborted(NotifyCheckpointAbortedITCase.java:198)
> Jul 03 01:26:35   at 
> org.apache.flink.test.checkpointing.NotifyCheckpointAbortedITCase.testNotifyCheckpointAborted(NotifyCheckpointAbortedITCase.java:189)
> Jul 03 01:26:35   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Jul 03 01:26:35   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Jul 03 01:26:35   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Jul 03 01:26:35   at java.lang.reflect.Method.invoke(Method.java:498)
> Jul 03 01:26:35   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> Jul 03 01:26:35   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Jul 03 01:26:35   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> Jul 03 01:26:35   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Jul 03 01:26:35   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
> Jul 03 01:26:35   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
> Jul 03 01:26:35   at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> Jul 03 01:26:35   at java.lang.Thread.run(Thread.java:748)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32523) NotifyCheckpointAbortedITCase.testNotifyCheckpointAborted fails with timeout on AZP

2023-08-31 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17760847#comment-17760847
 ] 

Hangxiang Yu commented on FLINK-32523:
--

picked 66cc21d4e2c091c0f5211bf558d1a69364519f9b and merged into 1.16 & 1.17

> NotifyCheckpointAbortedITCase.testNotifyCheckpointAborted fails with timeout 
> on AZP
> ---
>
> Key: FLINK-32523
> URL: https://issues.apache.org/jira/browse/FLINK-32523
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.16.2, 1.18.0
>Reporter: Sergey Nuyanzin
>Assignee: Hangxiang Yu
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.18.0
>
> Attachments: failure.log
>
>
> This build
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50795=logs=39d5b1d5-3b41-54dc-6458-1e2ddd1cdcf3=0c010d0c-3dec-5bf1-d408-7b18988b1b2b=8638
>  fails with timeout
> {noformat}
> Jul 03 01:26:35 org.junit.runners.model.TestTimedOutException: test timed out 
> after 10 milliseconds
> Jul 03 01:26:35   at java.lang.Object.wait(Native Method)
> Jul 03 01:26:35   at java.lang.Object.wait(Object.java:502)
> Jul 03 01:26:35   at 
> org.apache.flink.core.testutils.OneShotLatch.await(OneShotLatch.java:61)
> Jul 03 01:26:35   at 
> org.apache.flink.test.checkpointing.NotifyCheckpointAbortedITCase.verifyAllOperatorsNotifyAborted(NotifyCheckpointAbortedITCase.java:198)
> Jul 03 01:26:35   at 
> org.apache.flink.test.checkpointing.NotifyCheckpointAbortedITCase.testNotifyCheckpointAborted(NotifyCheckpointAbortedITCase.java:189)
> Jul 03 01:26:35   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Jul 03 01:26:35   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Jul 03 01:26:35   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Jul 03 01:26:35   at java.lang.reflect.Method.invoke(Method.java:498)
> Jul 03 01:26:35   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> Jul 03 01:26:35   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Jul 03 01:26:35   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> Jul 03 01:26:35   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Jul 03 01:26:35   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
> Jul 03 01:26:35   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
> Jul 03 01:26:35   at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> Jul 03 01:26:35   at java.lang.Thread.run(Thread.java:748)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-32975) Enhance equal() for all MapState's iterator

2023-08-30 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu resolved FLINK-32975.
--
Fix Version/s: 1.19.0
   Resolution: Fixed

merge 
[aa8d93ea|https://github.com/apache/flink/commit/aa8d93ea239f5be79066b7e5caad08d966c86ab2]
 into master

> Enhance equal() for all MapState's iterator
> ---
>
> Key: FLINK-32975
> URL: https://issues.apache.org/jira/browse/FLINK-32975
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends, Tests
>Reporter: Rui Xia
>Assignee: Rui Xia
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> This ticket is originated from the junit version upgrade of Changelog module.
> The assertThat() in junit5 uses Object#equals to compare two Map.Entry. The 
> unnamed class Map.Entry in ChangelogMapState uses the default 
> Object#equals(), which does not compares the contents of two entries.
> This ticket is to add a basic equal() implementation for the Map.Entry UV> in ChangelogMapState.
> EDIT: To be more general, the equal() for RocksDB MapState's iterator is also 
> vacant. It would better align the behavior of the comparsion of all 
> MapState#Entry. This ticket will correct them together (RocksDB's and 
> Changelog's).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


<    1   2   3   4   5   >