[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1

2023-08-05 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751329#comment-17751329
 ] 

Roman Khachatryan commented on FLINK-29913:
---

javadoc updated in 
c7d3e36053091b7d7b10e89c5f9dba3df46ec0bd (master)
f2d3ca0bf4d49621450c7e71c1bd7c9de2deb89b (1.16)
e213267ebf691f1e959be6f5154a790c2ec6a2e9 (1.17)

> Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
> -
>
> Key: FLINK-29913
> URL: https://issues.apache.org/jira/browse/FLINK-29913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.16.0, 1.17.0
>Reporter: Yanfei Lei
>Assignee: Feifan Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0, 1.16.3, 1.17.2
>
>
> When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state 
> backend would be discarded by registering the same name handle. See 
> [https://github.com/apache/flink/pull/21050#discussion_r1011061072]
> cc [~roman] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1

2023-08-05 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751308#comment-17751308
 ] 

Roman Khachatryan commented on FLINK-29913:
---

Thanks a lot [~Feifan Wang], happy to hear that :)

I've merged backports to 1.16 as 
aad658910fd6967199bce69f33872d245315bca1..c6c54412bf70af4432071dfdf131b62ff8236fe7
and to 1.17 as 
65e7004dd32633f9dfc87b0808ffcb587daf525c..f3d212956660a77d58ab99f4de6611760e5f0b9a.

I've taken a look at javadoc update, thanks for noticing this (y)

> Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
> -
>
> Key: FLINK-29913
> URL: https://issues.apache.org/jira/browse/FLINK-29913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.16.0, 1.17.0
>Reporter: Yanfei Lei
>Assignee: Feifan Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0, 1.16.3, 1.17.2
>
>
> When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state 
> backend would be discarded by registering the same name handle. See 
> [https://github.com/apache/flink/pull/21050#discussion_r1011061072]
> cc [~roman] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1

2023-08-05 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751304#comment-17751304
 ] 

Feifan Wang commented on FLINK-29913:
-

Hi [~roman] , In addition backport pr of 
[1.16|https://github.com/apache/flink/pull/23137] and 
[1.17|https://github.com/apache/flink/pull/23139], I also submitted [a PR to 
fix outdated java doc in 
SharedStateRegistry|https://github.com/apache/flink/pull/23147], PTAL.

> Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
> -
>
> Key: FLINK-29913
> URL: https://issues.apache.org/jira/browse/FLINK-29913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.16.0, 1.17.0
>Reporter: Yanfei Lei
>Assignee: Feifan Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0, 1.16.3, 1.17.2
>
>
> When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state 
> backend would be discarded by registering the same name handle. See 
> [https://github.com/apache/flink/pull/21050#discussion_r1011061072]
> cc [~roman] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1

2023-08-04 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1775#comment-1775
 ] 

Feifan Wang commented on FLINK-29913:
-

[~roman] ,  I submitted two PRs for 
[1.16|https://github.com/apache/flink/pull/23137] and 
[1.17|https://github.com/apache/flink/pull/23139], please take a look.

> Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
> -
>
> Key: FLINK-29913
> URL: https://issues.apache.org/jira/browse/FLINK-29913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.16.0, 1.17.0
>Reporter: Yanfei Lei
>Assignee: Feifan Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0, 1.16.3, 1.17.2
>
>
> When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state 
> backend would be discarded by registering the same name handle. See 
> [https://github.com/apache/flink/pull/21050#discussion_r1011061072]
> cc [~roman] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1

2023-08-03 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17750647#comment-17750647
 ] 

Feifan Wang commented on FLINK-29913:
-

 Thanks to [~roman]  for helping to review the PR, I learned a lot from it.
{quote}do you mind creating backport PRs for branches 1.17 and 1.16?
{quote}
I am glad to.

> Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
> -
>
> Key: FLINK-29913
> URL: https://issues.apache.org/jira/browse/FLINK-29913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.16.0, 1.17.0
>Reporter: Yanfei Lei
>Assignee: Feifan Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0, 1.16.3, 1.17.2
>
>
> When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state 
> backend would be discarded by registering the same name handle. See 
> [https://github.com/apache/flink/pull/21050#discussion_r1011061072]
> cc [~roman] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1

2023-08-03 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17750636#comment-17750636
 ] 

Roman Khachatryan commented on FLINK-29913:
---

Merged into master as 6dcb10abf319b9e5494a82bee71f1ae1a4e4b211 ... 
85f32d6bcb31708c9c2c845ea03ef117726b7c1a.

[~Feifan Wang] do you mind creating backport PRs for branches 1.17 and 1.16?

> Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
> -
>
> Key: FLINK-29913
> URL: https://issues.apache.org/jira/browse/FLINK-29913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.16.0, 1.17.0
>Reporter: Yanfei Lei
>Assignee: Feifan Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0, 1.16.3, 1.17.2
>
>
> When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state 
> backend would be discarded by registering the same name handle. See 
> [https://github.com/apache/flink/pull/21050#discussion_r1011061072]
> cc [~roman] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1

2023-05-28 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17726933#comment-17726933
 ] 

Feifan Wang commented on FLINK-29913:
-

Thanks for your information [~klion26] , I would like to add a case that I 
recently thought of a possible problem: Since 1.17, user can trigger checkpoint 
with assigned checkpoint type (CONFIGURED,FULL,INCREMENTAL). If user trigger a 
FULL checkpoint by rest api, this forces re-upload of snapshot files that have 
already been uploaded. Using remote file path based register key is still valid 
in this case.

I have submitted a PR, please help to review it [~roman] ,[~klion26] .

> Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
> -
>
> Key: FLINK-29913
> URL: https://issues.apache.org/jira/browse/FLINK-29913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.16.0, 1.17.0
>Reporter: Yanfei Lei
>Assignee: Feifan Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.3, 1.17.2
>
>
> When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state 
> backend would be discarded by registering the same name handle. See 
> [https://github.com/apache/flink/pull/21050#discussion_r1011061072]
> cc [~roman] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1

2023-05-25 Thread Congxian Qiu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17726084#comment-17726084
 ] 

Congxian Qiu commented on FLINK-29913:
--

thanks for the discuss above and contribution!

Using the UUID/filename as the key solves the problem here, and it also makes 
sense because the key and the remote file are one-to-one. In addition, it can 
also solve some other potential problems, for example, if the Flink job 
management platform uses the SharedRegistry here to maintain the checkpoints 
lifecycle, if a task has two ssts with the same name, it will now cause the 
file to be deleted by mistake (this situation occurs as follows: job A 
generates a checkpoint chk1, then stops, job B job B resumes from chk1, 
completes chk2, then stops, then job C resumes from chk1, completes chk3, after 
we register chk2 and chk3 in one SharedRegistry, we'll delete some remote files 
by mistake, because there will be some sst files in chk2 and chk3 with the same 
name)

> Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
> -
>
> Key: FLINK-29913
> URL: https://issues.apache.org/jira/browse/FLINK-29913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.16.0, 1.17.0
>Reporter: Yanfei Lei
>Assignee: Feifan Wang
>Priority: Major
> Fix For: 1.16.3, 1.17.2
>
>
> When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state 
> backend would be discarded by registering the same name handle. See 
> [https://github.com/apache/flink/pull/21050#discussion_r1011061072]
> cc [~roman] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1

2023-05-24 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17725693#comment-17725693
 ] 

Roman Khachatryan commented on FLINK-29913:
---

Great! Thanks [~Feifan Wang] 

> Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
> -
>
> Key: FLINK-29913
> URL: https://issues.apache.org/jira/browse/FLINK-29913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.16.0, 1.17.0
>Reporter: Yanfei Lei
>Assignee: Feifan Wang
>Priority: Major
> Fix For: 1.16.2, 1.17.2
>
>
> When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state 
> backend would be discarded by registering the same name handle. See 
> [https://github.com/apache/flink/pull/21050#discussion_r1011061072]
> cc [~roman] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1

2023-05-23 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17725624#comment-17725624
 ] 

Feifan Wang commented on FLINK-29913:
-

Thanks [~roman] , I will prepare a pr.

> Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
> -
>
> Key: FLINK-29913
> URL: https://issues.apache.org/jira/browse/FLINK-29913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.16.0, 1.17.0
>Reporter: Yanfei Lei
>Assignee: Feifan Wang
>Priority: Major
> Fix For: 1.16.2, 1.17.2
>
>
> When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state 
> backend would be discarded by registering the same name handle. See 
> [https://github.com/apache/flink/pull/21050#discussion_r1011061072]
> cc [~roman] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1

2023-05-23 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17725569#comment-17725569
 ] 

Roman Khachatryan commented on FLINK-29913:
---

Got it, thanks for clarifying. 

However, I think that using 
"Integer.toString(System.identityHashCode(stateHandle))" can easily lead to 
collisions, thereby causing checkpoint corruption.

But this seems to me an (important) implementation detail, that can be 
discussed in the PR. WDYT?

> Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
> -
>
> Key: FLINK-29913
> URL: https://issues.apache.org/jira/browse/FLINK-29913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.16.0, 1.17.0
>Reporter: Yanfei Lei
>Assignee: Feifan Wang
>Priority: Major
> Fix For: 1.16.2, 1.17.2
>
>
> When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state 
> backend would be discarded by registering the same name handle. See 
> [https://github.com/apache/flink/pull/21050#discussion_r1011061072]
> cc [~roman] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1

2023-05-23 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17725470#comment-17725470
 ] 

Feifan Wang commented on FLINK-29913:
-

Thanks for the clarification [~roman] ! 
{quote}Further, regarding the approach of using unique registry key, I agree 
with Congxian Qiu , we can just choose a stable register key generation method 
based on remote file name (such as use md5 digest of remote file name) , which 
can replace of 
IncrementalRemoteKeyedStateHandle#createSharedStateRegistryKeyFromFileName() .
The mapping of local sst file name to StreamStateHandle never changed , so the 
part of RocksDB recovery does not need to be changed.
{quote}
I mean we still use local file name as key of sharedState map in  
_*IncrementalRemoteKeyedStateHandle*_ and use remote file path when generating 
SharedStateRegisterKey. Changes in _*IncrementalRemoteKeyedStateHandle*_ like 
this :
{code:java}
...

private final Map sharedState;  // still use 
local file name as key of this map, corresponding to the “never change” I 
mentioned above

...


public void registerSharedStates(SharedStateRegistry stateRegistry, long 
checkpointID) {
...
for (Map.Entry sharedStateHandle :
sharedState.entrySet()) {

SharedStateRegistryKey registryKey = 
generateRegisterKey(sharedStateHandle.getValue);  // changed line


StreamStateHandle reference =
stateRegistry.registerReference(
registryKey, sharedStateHandle.getValue(), 
checkpointID);

sharedStateHandle.setValue(reference);
}
}

private static SharedStateRegistryKey generateRegisterKey(StreamStateHandle 
stateHandle) {
String keyString = null;
if (stateHandle instanceof FileStateHandle) {
keyString = ((FileStateHandle) stateHandle).getFilePath().toString();
} else if (stateHandle instanceof ByteStreamStateHandle) {
keyString = ((ByteStreamStateHandle) stateHandle).getHandleName();
} else {
keyString = Integer.toString(System.identityHashCode(stateHandle));
}
return new SharedStateRegistryKey(md5sum(keyString)); // may be other 
digest algorithm
}

{code}

> Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
> -
>
> Key: FLINK-29913
> URL: https://issues.apache.org/jira/browse/FLINK-29913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.16.0, 1.17.0
>Reporter: Yanfei Lei
>Assignee: Feifan Wang
>Priority: Major
> Fix For: 1.16.2, 1.17.2
>
>
> When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state 
> backend would be discarded by registering the same name handle. See 
> [https://github.com/apache/flink/pull/21050#discussion_r1011061072]
> cc [~roman] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1

2023-05-23 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17725418#comment-17725418
 ] 

Roman Khachatryan commented on FLINK-29913:
---

{quote}On a 64-bit system, about 7.63MB more memory will be used for every one 
million entries
Is there any other runtime overhead I missed ?
{quote}
I was more concerned about the additional time required to traverse the (mostly 
single-element) lists. When a checkpoint is subsumed, *all* entries need to be 
scanned. Adding pointer dereference(s) might break any optimizations that JVM 
and CPU would otherwise employ.

 
{quote}As for the complexity, this approach will indeed increase the operation 
of the linked list in the registerReference() method and 
unregisterUnusedState() method.
But given that this is easy to implement, and the implementation is cohesive, I 
think the complexity is acceptable.
{quote}
In my view, simplicity in this is part is worth the efforts. The problem that 
SharedStateRegistry solves is already tricky, and we shouldn't complicate it 
further (higher complexity potentially leads to more bugs and more maintanance 
efforts). 

 
{quote}Further, regarding the approach of using unique registry key, I agree 
with Congxian Qiu , we can just choose a stable register key generation method 
based on remote file name (such as use md5 digest of remote file name) , which 
can replace of 
IncrementalRemoteKeyedStateHandle#createSharedStateRegistryKeyFromFileName() .
The mapping of local sst file name to StreamStateHandle never changed , so the 
part of RocksDB recovery does not need to be changed.
{quote}
I don't fully understand what does "never changed" means here.
[Here|https://github.com/apache/flink/blob/fbf7b91424ec626ae56dd2477347a7759db6d5fe/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java#L105],
 the ID is used to create local path. If we change ID to remote path, local 
will change too. Per my understanding, we can not change the local path without 
updating metadata files.
Or am I missing something?

 
{quote}Whichever approach will be chosen, I am happy to implement it. Can you 
assign this ticket to me Roman Khachatryan ? looking forward to hearing from 
you.
{quote}
Sure, thanks for volounteering!

> Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
> -
>
> Key: FLINK-29913
> URL: https://issues.apache.org/jira/browse/FLINK-29913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Yanfei Lei
>Priority: Minor
>
> When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state 
> backend would be discarded by registering the same name handle. See 
> [https://github.com/apache/flink/pull/21050#discussion_r1011061072]
> cc [~roman] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1

2023-05-23 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17725387#comment-17725387
 ] 

Feifan Wang commented on FLINK-29913:
-

One overhead I can see is that it will use more memory for storing the next 
pointer. On a 64-bit system, about 7.63MB more memory will be used for every 
one million entries, I think it is acceptable. Is there any other runtime 
overhead I missed ?

As for the complexity, this approach will indeed increase the operation of the 
linked list in the _registerReference()_ method and _unregisterUnusedState()_ 
method. But given that this is easy to implement, and the implementation is 
cohesive, I think the complexity is acceptable.

 

Just to clarify, I think using a unique ID is also a valid approach, but I want 
learn how you do the selection. Further, regarding the approach of using unique 
registry key, I agree with [~klion26] , we can just choose a stable register 
key generation method based on remote file name (such as use md5 digest of 
remote file name) , which can replace of 
IncrementalRemoteKeyedStateHandle#createSharedStateRegistryKeyFromFileName() . 
The mapping of local sst file name to StreamStateHandle never changed , so the 
part of RocksDB recovery does not need to be changed.

 

Whichever approach will be chosen, I am happy to implement it. Can you assign 
this ticket to me  [~roman] ? looking forward to hearing from you.

> Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
> -
>
> Key: FLINK-29913
> URL: https://issues.apache.org/jira/browse/FLINK-29913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Yanfei Lei
>Priority: Minor
>
> When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state 
> backend would be discarded by registering the same name handle. See 
> [https://github.com/apache/flink/pull/21050#discussion_r1011061072]
> cc [~roman] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1

2023-05-23 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17725336#comment-17725336
 ] 

Roman Khachatryan commented on FLINK-29913:
---

Thanks for the proposal [~Feifan Wang]. I think it's easier to implement, 
compared to always unique state IDs. OTH, it complicates already complex part 
of the system; and has some runtime overhead. So I'd rather go with unique 
state IDs.

WDYT?

 

As for the priority, I'd change it to Major if there are no objections.

> Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
> -
>
> Key: FLINK-29913
> URL: https://issues.apache.org/jira/browse/FLINK-29913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Yanfei Lei
>Priority: Minor
>
> When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state 
> backend would be discarded by registering the same name handle. See 
> [https://github.com/apache/flink/pull/21050#discussion_r1011061072]
> cc [~roman] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1

2023-05-23 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17725286#comment-17725286
 ] 

Feifan Wang commented on FLINK-29913:
-

Sorry I didn't notice this ticket earlier so I submitted a duplicate one.

[~Yanfei Lei]  for the priority, I agree with [~klion26] , since it may break 
the checkpoint in valid use case, the priority at least be *Major* .

[~roman] I think your proposal is valid, but I still want to provide an 
alternative :
 # Make SharedStateRegistry allow register multi state object to same key. The 
generation strategy for SharedStateRegistryKey is still determined by 
CompositeStateHandle , as it is now. Different state objects may be registered 
under the same key in force-full-checkpoint (active re-upload).
 # SharedStateRegistry maintains an linked entry list sorted by registration 
time for each key. PlaceHolderStreamStateHandle will be replace by the last one 
in the entry list.

In this approach, SharedStateRegistry don't discard any state object in 
registration process. SharedStateRegistry only deleted state object when its 
lastUsedCheckpoint is subsumed.

> Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
> -
>
> Key: FLINK-29913
> URL: https://issues.apache.org/jira/browse/FLINK-29913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Yanfei Lei
>Priority: Minor
>
> When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state 
> backend would be discarded by registering the same name handle. See 
> [https://github.com/apache/flink/pull/21050#discussion_r1011061072]
> cc [~roman] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1

2022-11-14 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17633672#comment-17633672
 ] 

Roman Khachatryan commented on FLINK-29913:
---

[~klion26] , we could use remote filename as well, but we still have to store 
the mapping from remote to local to initialize RocksDB on recovery.

UUID are slightly better IMO because they are usually shorter and therefore 
consume less space in the final _metadata file and as keys in 
SharedStateRegistry.

[~Yanfei Lei] , good question. In case of re-upload, the old state should be 
discarded eventually when the checkpoint it was created for is subsumed.

> Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
> -
>
> Key: FLINK-29913
> URL: https://issues.apache.org/jira/browse/FLINK-29913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Yanfei Lei
>Priority: Minor
>
> When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state 
> backend would be discarded by registering the same name handle. See 
> [https://github.com/apache/flink/pull/21050#discussion_r1011061072]
> cc [~roman] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1

2022-11-14 Thread Yanfei Lei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17633657#comment-17633657
 ] 

Yanfei Lei commented on FLINK-29913:


[~roman] Great proposal!  I think that maintaining the mapping of  is necessary, because RocksDB needs the file name(xxx.sst) to 
rebuild its instance.

And I have a question about "always generate unique state handle IDs":

if the confirmation notification was missing, TM re-upload some state handles, 
how do we delete those duplicate state handles?

 

> Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
> -
>
> Key: FLINK-29913
> URL: https://issues.apache.org/jira/browse/FLINK-29913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Yanfei Lei
>Priority: Minor
>
> When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state 
> backend would be discarded by registering the same name handle. See 
> [https://github.com/apache/flink/pull/21050#discussion_r1011061072]
> cc [~roman] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1

2022-11-13 Thread Congxian Qiu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17633506#comment-17633506
 ] 

Congxian Qiu commented on FLINK-29913:
--

sorry for the late reply.

[~Yanfei Lei]  for the priority, IMHO, if the user set \{{ 
maxConcurrenctCheckpoint > 1 && MAX_RETAINED_CHECKPOINTS > 1 }} , then the 
checkpoints may be broken, and can't restore from the checkpoint because of the 
{{{}FileNotFoundException{}}}, so I think it deserves to escalate the priority.

[~roman] your proposal seems valid from my perspective, maybe changing the 
logic for {{generating the registry key(perhaps using the filename in the 
remote filesystem)is enough to solve the problem here?}}

please let me what do you think about this, thanks.

> Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
> -
>
> Key: FLINK-29913
> URL: https://issues.apache.org/jira/browse/FLINK-29913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Yanfei Lei
>Priority: Minor
>
> When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state 
> backend would be discarded by registering the same name handle. See 
> [https://github.com/apache/flink/pull/21050#discussion_r1011061072]
> cc [~roman] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1

2022-11-09 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17631234#comment-17631234
 ] 

Roman Khachatryan commented on FLINK-29913:
---

Thanks a lot for noticing and reporting this issue [~Yanfei Lei] and [~klion26]!

I think there is a (conceptually) simple solution: always generate unique state 
handle IDs.
(It was already discussed offline before as it could solve some similar 
problems like detecting duplicates in SharedStateRegistry).

The ID doesn't have to have any semantic meaning. 
RocksDB happens to [store the local path in 
ID|https://github.com/apache/flink/blob/421f057a7488fd64854a82424755f76b89561a0b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java#L397]
 and then uses it [on 
recovery|https://github.com/apache/flink/blob/421f057a7488fd64854a82424755f76b89561a0b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java#L105];
 but it can be a separate field in the handle as well.

I don't see any issues with this approach, including recovery, rescaling, and 
JM failover cases.

What do you think? Maybe there are some alternatives?

> Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
> -
>
> Key: FLINK-29913
> URL: https://issues.apache.org/jira/browse/FLINK-29913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Yanfei Lei
>Priority: Minor
>
> When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state 
> backend would be discarded by registering the same name handle. See 
> [https://github.com/apache/flink/pull/21050#discussion_r1011061072]
> cc [~roman] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1

2022-11-07 Thread Yanfei Lei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17629771#comment-17629771
 ] 

Yanfei Lei commented on FLINK-29913:


[~klion26] Since maxConcurrentCheckpoint>1 && MAX_CONCURRENT_CHECKPOINTS>1  are 
less frequently used, I set the priority to Minor.
Under the current checkpoint registration and deletion mechanism, I think this 
bug will take a while to fix, I'd like to fix this problem after discussion.

> Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
> -
>
> Key: FLINK-29913
> URL: https://issues.apache.org/jira/browse/FLINK-29913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Yanfei Lei
>Priority: Minor
>
> When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state 
> backend would be discarded by registering the same name handle. See 
> [https://github.com/apache/flink/pull/21050#discussion_r1011061072]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1

2022-11-07 Thread Congxian Qiu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17629759#comment-17629759
 ] 

Congxian Qiu commented on FLINK-29913:
--

[~Yanfei Lei]  thanks for creating this ticket and the IT Case,  would you like 
to contribute a fix for this problem?

For the priority, As this may lead to {{FileNotFoundExecption if set 
{{maxConcurrencthCheckpoint > 1, I think it at least needs to be Critical, 
what do you think about this?

> Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
> -
>
> Key: FLINK-29913
> URL: https://issues.apache.org/jira/browse/FLINK-29913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Yanfei Lei
>Priority: Minor
>
> When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state 
> backend would be discarded by registering the same name handle. See 
> [https://github.com/apache/flink/pull/21050#discussion_r1011061072]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1

2022-11-06 Thread Yanfei Lei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17629630#comment-17629630
 ] 

Yanfei Lei commented on FLINK-29913:


I added 
[ConcurrentCheckpointITCase|https://github.com/apache/flink/commit/db27669d8bbfa995d961fae32f46bc4df676afb7]
 to reproduce this problem at 
https://github.com/fredia/flink/tree/test_concurrent_chk.

> Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
> -
>
> Key: FLINK-29913
> URL: https://issues.apache.org/jira/browse/FLINK-29913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Yanfei Lei
>Priority: Minor
>
> When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state 
> backend would be discarded by registering the same name handle. See 
> [https://github.com/apache/flink/pull/21050#discussion_r1011061072]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)