[jira] [Created] (FLINK-36325) Implement basic restore from checkpoint for ForStStateBackend

2024-09-19 Thread Feifan Wang (Jira)
Feifan Wang created FLINK-36325:
---

 Summary: Implement basic restore from checkpoint for 
ForStStateBackend
 Key: FLINK-36325
 URL: https://issues.apache.org/jira/browse/FLINK-36325
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends
Reporter: Feifan Wang


As title, implement basic restore from checkpoint for ForStStateBackend, 
rescale will be implemented later.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35510) Implement basic incremental checkpoint for ForStStateBackend

2024-06-03 Thread Feifan Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feifan Wang updated FLINK-35510:

Parent: FLINK-34982
Issue Type: Sub-task  (was: New Feature)

> Implement basic incremental checkpoint for ForStStateBackend
> 
>
> Key: FLINK-35510
> URL: https://issues.apache.org/jira/browse/FLINK-35510
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Feifan Wang
>Priority: Major
>
> Use low DB api implement a basic incremental checkpoint for 
> ForStStatebackend, follow steps:
>  # db.disableFileDeletions()
>  # db.getLiveFiles(true)
>  # db.entableFileDeletes(false)
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35510) Implement basic incremental checkpoint for ForStStateBackend

2024-06-03 Thread Feifan Wang (Jira)
Feifan Wang created FLINK-35510:
---

 Summary: Implement basic incremental checkpoint for 
ForStStateBackend
 Key: FLINK-35510
 URL: https://issues.apache.org/jira/browse/FLINK-35510
 Project: Flink
  Issue Type: New Feature
  Components: Runtime / State Backends
Reporter: Feifan Wang


Use low DB api implement a basic incremental checkpoint for ForStStatebackend, 
follow steps:
 # db.disableFileDeletions()
 # db.getLiveFiles(true)
 # db.entableFileDeletes(false)

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35434) Support pass exception in StateExecutor to runtime

2024-05-23 Thread Feifan Wang (Jira)
Feifan Wang created FLINK-35434:
---

 Summary: Support pass exception in StateExecutor to runtime
 Key: FLINK-35434
 URL: https://issues.apache.org/jira/browse/FLINK-35434
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends
Reporter: Feifan Wang


Exception may thrown when _StateExecutor_ execute the state request , such as a 
IOException. We should pass the exception to runtime then failed the job in 
this situation.

 
_InternalStateFuture#completeExceptionally()_ will be added as [discussion 
here|https://github.com/apache/flink/pull/24739#discussion_r1590633134].
And then,  _ForStWriteBatchOperation_ and _ForStGeneralMultiGetOperation_ will 
call this method when exception occurred.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32086) Cleanup non-reported managed directory on exit of TM

2024-04-22 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839592#comment-17839592
 ] 

Feifan Wang commented on FLINK-32086:
-

Hi [~Zakelly] , are you still work on this ticket ? We want solve this in 1.20 
, I can take this ticket if you don't mind.

> Cleanup non-reported managed directory on exit of TM
> 
>
> Key: FLINK-32086
> URL: https://issues.apache.org/jira/browse/FLINK-32086
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35041) IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed

2024-04-11 Thread Feifan Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feifan Wang updated FLINK-35041:

Description: 
{code:java}
Apr 08 03:22:45 03:22:45.450 [ERROR] 
org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration
 -- Time elapsed: 0.034 s <<< FAILURE!
Apr 08 03:22:45 org.opentest4j.AssertionFailedError: 
Apr 08 03:22:45 
Apr 08 03:22:45 expected: false
Apr 08 03:22:45  but was: true
Apr 08 03:22:45 at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
Apr 08 03:22:45 at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
Apr 08 03:22:45 at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(K.java:45)
Apr 08 03:22:45 at 
org.apache.flink.runtime.state.DiscardRecordedStateObject.verifyDiscard(DiscardRecordedStateObject.java:34)
Apr 08 03:22:45 at 
org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration(IncrementalRemoteKeyedStateHandleTest.java:211)
Apr 08 03:22:45 at java.lang.reflect.Method.invoke(Method.java:498)
Apr 08 03:22:45 at 
java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
Apr 08 03:22:45 at 
java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
Apr 08 03:22:45 at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
Apr 08 03:22:45 at 
java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
Apr 08 03:22:45 at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)

{code}
[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58782&view=logs&j=77a9d8e1-d610-59b3-fc2a-4766541e0e33&t=125e07e7-8de0-5c6c-a541-a567415af3ef&l=9238]
 

  was:
{code:java}
Apr 08 03:22:45 03:22:45.450 [ERROR] 
org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration
 -- Time elapsed: 0.034 s <<< FAILURE!
Apr 08 03:22:45 org.opentest4j.AssertionFailedError: 
Apr 08 03:22:45 
Apr 08 03:22:45 expected: false
Apr 08 03:22:45  but was: true
Apr 08 03:22:45 at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
Apr 08 03:22:45 at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
Apr 08 03:22:45 at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
Apr 08 03:22:45 at 
org.apache.flink.runtime.state.DiscardRecordedStateObject.verifyDiscard(DiscardRecordedStateObject.java:34)
Apr 08 03:22:45 at 
org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration(IncrementalRemoteKeyedStateHandleTest.java:211)
Apr 08 03:22:45 at java.lang.reflect.Method.invoke(Method.java:498)
Apr 08 03:22:45 at 
java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
Apr 08 03:22:45 at 
java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
Apr 08 03:22:45 at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
Apr 08 03:22:45 at 
java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
Apr 08 03:22:45 at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)

{code}


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58782&view=logs&j=77a9d8e1-d610-59b3-fc2a-4766541e0e33&t=125e07e7-8de0-5c6c-a541-a567415af3ef&l=9238


> IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed
> --
>
> Key: FLINK-35041
> URL: https://issues.apache.org/jira/browse/FLINK-35041
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI
>Affects Versions: 1.20.0
>Reporter: Weijie Guo
>Priority: Blocker
>
> {code:java}
> Apr 08 03:22:45 03:22:45.450 [ERROR] 
> org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration
>  -- Time elapsed: 0.034 s <<< FAILURE!
> Apr 08 03:22:45 org.opentest4j.AssertionFailedError: 
> Apr 08 03:22:45 
> Apr 08 03:22:45 expected: false
> Apr 08 03:22:45  but was: true
> Apr 08 03:22:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> Apr 08 03:22:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> Apr 08 03:22:45   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(K.java:45)
> Apr 08 03:22:45   at 
> org.apache.flink.runtime.state.DiscardRecordedStateObject.verifyDiscard(DiscardRecordedStateObject.java:34)
> Apr 08 03:22:45   at 
> org.apache.flink.runtime.state.IncrementalRemoteKeyedSt

[jira] [Commented] (FLINK-33734) Merge unaligned checkpoint state handle

2024-03-25 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17830472#comment-17830472
 ] 

Feifan Wang commented on FLINK-33734:
-

Kindly ping [~Zakelly] , [~roman] .

> Merge unaligned checkpoint state handle
> ---
>
> Key: FLINK-33734
> URL: https://issues.apache.org/jira/browse/FLINK-33734
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: Feifan Wang
>Assignee: Feifan Wang
>Priority: Major
>  Labels: pull-request-available
> Attachments: flamegraph.control-group.html, 
> flamegraph.merge-handle-and-serialize-on-tm.html, 
> flamegraph.only-merge-handle.html, image-2024-02-18-15-12-20-665.png
>
>
> h3. Background
> Unaligned checkpoint will write the inflight-data of all InputChannel and 
> ResultSubpartition of the same subtask to the same file during checkpoint. 
> The InputChannelStateHandle and ResultSubpartitionStateHandle organize the 
> metadata of inflight-data at the channel granularity, which causes the file 
> name to be repeated many times. When a job is under backpressure and task 
> parallelism is high, the metadata of unaligned checkpoints will bloat. This 
> will result in:
>  # The amount of data reported by taskmanager to jobmanager increases, and 
> jobmanager takes longer to process these RPC requests.
>  # The metadata of the entire checkpoint becomes very large, and it takes 
> longer to serialize and write it to dfs.
> Both of the above points ultimately lead to longer checkpoint duration.
> h3. A Production example
> Take our production job with a parallelism of 4800 as an example:
>  # When there is no back pressure, checkpoint end-to-end duration is within 7 
> seconds.
>  # When under pressure: checkpoint end-to-end duration often exceeds 1 
> minute. We found that jobmanager took more than 40 seconds to process rpc 
> requests, and serialized metadata took more than 20 seconds.Some checkpoint 
> statistics:
> |metadata file size|950 MB|
> |channel state count|12,229,854|
> |channel file count|5536|
> Of the 950MB in the metadata file, 68% are redundant file paths.
> We enabled log-based checkpoint on this job and hoped that the checkpoint 
> could be completed within 30 seconds. This problem made it difficult to 
> achieve this goal.
> h3. Propose changes
> I suggest introducing MergedInputChannelStateHandle and 
> MergedResultSubpartitionStateHandle to eliminate redundant file paths.
> The taskmanager merges all InputChannelStateHandles with the same delegated 
> StreamStateHandle in the same subtask into one MergedInputChannelStateHandle 
> before reporting. When recovering from checkpoint, jobmangager converts 
> MergedInputChannelStateHandle to InputChannelStateHandle collection before 
> assigning state handle, and the rest of the process does not need to be 
> changed. 
> Structure of MergedInputChannelStateHandle :
>  
> {code:java}
> {   // MergedInputChannelStateHandle
>     "delegate": {
>         "filePath": 
> "viewfs://hadoop-meituan/flink-yg15/checkpoints/retained/1234567/ab8d0c2f02a47586490b15e7a2c30555/chk-31/ffe54c0a-9b6e-4724-aae7-61b96bf8b1cf",
>         "stateSize": 123456
>     },
>     "size": 2000,
>     "subtaskIndex":0,
>     "channels": [ // One InputChannel per element
>         {
>             "info": {
>                 "gateIdx": 0,
>                 "inputChannelIdx": 0
>             },
>             "offsets": [
>                 100,200,300,400
>             ],
>             "size": 1400
>         },
>         {
>             "info": {
>                 "gateIdx": 0,
>                 "inputChannelIdx": 1
>             },
>             "offsets": [
>                 500,600
>             ],
>             "size": 600
>         }
>     ]
> }
>  {code}
> MergedResultSubpartitionStateHandle is similar.
>  
>  
> WDYT [~roman] , [~pnowojski] , [~fanrui] ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33734) Merge unaligned checkpoint state handle

2024-02-18 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818283#comment-17818283
 ] 

Feifan Wang commented on FLINK-33734:
-

[~roman] :
{quote}Do I understand correctly, that the improvement now comes mostly from 
moving `offsets` serialization from JM to TM?
{quote}
Yes, 68% of the time reduction comes from offloading offset serialization to TM.
{quote}I'm wondering whether it would also make sense to move storing the 
handle from JM to TM and only return a pointer to it.
{quote}
I think it also make sense, but this method will make rescale more difficult.
{quote}Do you have the numbers for RPC times?
{quote}
I haven't tracked the processing time of rpc, so I don't have specific figures. 
The previous judgment that RPC took a long time came from the flame graph.

[^flamegraph.control-group.html]

[^flamegraph.only-merge-handle.html]

[^flamegraph.merge-handle-and-serialize-on-tm.html]
{quote}As for ByteStreamStateHandle, we may need to change channel granular 
splitting to subtask granular splitting.
{quote}
What I originally wanted to say here is that with the merged channel state 
handle, we don't need to extract bytes for each channel. This may not be 
critical at this point, so we can leave it alone.

 

[~Zakelly] :
{quote}Overall it makes sense to offload file creation from JM to TM
{quote}
In fact, this PR does not offload the file creation (metadata file) to TM, but 
only reorganizes the handle and offloads the serialization of the offset in the 
handle to TM.

The test that produced the above results was conducted based on FLINK-1.16.1. I 
constructed a job with 2000 parallelism and actively limited the throughput of 
the sink operator to simulate back pressure. The following figure is the 
topology diagram of the test job.

!https://km.sankuai.com/api/file/cdn/2059636095/80577277714?contentType=1&isNewContent=false|width=670,height=104!
{quote}I'm curious about why the checkpoint time reduced by 36s (from 66s to 
30s) when serializing in TM side while the metadata time is only 12s.
{quote}
Time saved of metadata serialization is only 12s, but it can save more time of 
the RPC processing.
{quote}And how the long-tail of async duration happened, is it due to massive 
file creation requests and hotspot issue in NN node of HDFS?
{quote}
The test job running on a public HDFS, many other jobs running on it, there are 
many uncontrollable factors, and I did not investigate them carefully.

> Merge unaligned checkpoint state handle
> ---
>
> Key: FLINK-33734
> URL: https://issues.apache.org/jira/browse/FLINK-33734
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: Feifan Wang
>Assignee: Feifan Wang
>Priority: Major
>  Labels: pull-request-available
> Attachments: flamegraph.control-group.html, 
> flamegraph.merge-handle-and-serialize-on-tm.html, 
> flamegraph.only-merge-handle.html, image-2024-02-18-15-12-20-665.png
>
>
> h3. Background
> Unaligned checkpoint will write the inflight-data of all InputChannel and 
> ResultSubpartition of the same subtask to the same file during checkpoint. 
> The InputChannelStateHandle and ResultSubpartitionStateHandle organize the 
> metadata of inflight-data at the channel granularity, which causes the file 
> name to be repeated many times. When a job is under backpressure and task 
> parallelism is high, the metadata of unaligned checkpoints will bloat. This 
> will result in:
>  # The amount of data reported by taskmanager to jobmanager increases, and 
> jobmanager takes longer to process these RPC requests.
>  # The metadata of the entire checkpoint becomes very large, and it takes 
> longer to serialize and write it to dfs.
> Both of the above points ultimately lead to longer checkpoint duration.
> h3. A Production example
> Take our production job with a parallelism of 4800 as an example:
>  # When there is no back pressure, checkpoint end-to-end duration is within 7 
> seconds.
>  # When under pressure: checkpoint end-to-end duration often exceeds 1 
> minute. We found that jobmanager took more than 40 seconds to process rpc 
> requests, and serialized metadata took more than 20 seconds.Some checkpoint 
> statistics:
> |metadata file size|950 MB|
> |channel state count|12,229,854|
> |channel file count|5536|
> Of the 950MB in the metadata file, 68% are redundant file paths.
> We enabled log-based checkpoint on this job and hoped that the checkpoint 
> could be completed within 30 seconds. This problem made it difficult to 
> achieve this goal.
> h3. Propose changes
> I suggest introducing MergedInputChannelStateHandle and 
> MergedResultSubpartitionStateHandle to eliminate redundant file paths.
> The taskmanager merges all InputChannelStateHand

[jira] [Updated] (FLINK-33734) Merge unaligned checkpoint state handle

2024-02-18 Thread Feifan Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feifan Wang updated FLINK-33734:

Attachment: flamegraph.control-group.html
flamegraph.merge-handle-and-serialize-on-tm.html
flamegraph.only-merge-handle.html

> Merge unaligned checkpoint state handle
> ---
>
> Key: FLINK-33734
> URL: https://issues.apache.org/jira/browse/FLINK-33734
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: Feifan Wang
>Assignee: Feifan Wang
>Priority: Major
>  Labels: pull-request-available
> Attachments: flamegraph.control-group.html, 
> flamegraph.merge-handle-and-serialize-on-tm.html, 
> flamegraph.only-merge-handle.html, image-2024-02-18-15-12-20-665.png
>
>
> h3. Background
> Unaligned checkpoint will write the inflight-data of all InputChannel and 
> ResultSubpartition of the same subtask to the same file during checkpoint. 
> The InputChannelStateHandle and ResultSubpartitionStateHandle organize the 
> metadata of inflight-data at the channel granularity, which causes the file 
> name to be repeated many times. When a job is under backpressure and task 
> parallelism is high, the metadata of unaligned checkpoints will bloat. This 
> will result in:
>  # The amount of data reported by taskmanager to jobmanager increases, and 
> jobmanager takes longer to process these RPC requests.
>  # The metadata of the entire checkpoint becomes very large, and it takes 
> longer to serialize and write it to dfs.
> Both of the above points ultimately lead to longer checkpoint duration.
> h3. A Production example
> Take our production job with a parallelism of 4800 as an example:
>  # When there is no back pressure, checkpoint end-to-end duration is within 7 
> seconds.
>  # When under pressure: checkpoint end-to-end duration often exceeds 1 
> minute. We found that jobmanager took more than 40 seconds to process rpc 
> requests, and serialized metadata took more than 20 seconds.Some checkpoint 
> statistics:
> |metadata file size|950 MB|
> |channel state count|12,229,854|
> |channel file count|5536|
> Of the 950MB in the metadata file, 68% are redundant file paths.
> We enabled log-based checkpoint on this job and hoped that the checkpoint 
> could be completed within 30 seconds. This problem made it difficult to 
> achieve this goal.
> h3. Propose changes
> I suggest introducing MergedInputChannelStateHandle and 
> MergedResultSubpartitionStateHandle to eliminate redundant file paths.
> The taskmanager merges all InputChannelStateHandles with the same delegated 
> StreamStateHandle in the same subtask into one MergedInputChannelStateHandle 
> before reporting. When recovering from checkpoint, jobmangager converts 
> MergedInputChannelStateHandle to InputChannelStateHandle collection before 
> assigning state handle, and the rest of the process does not need to be 
> changed. 
> Structure of MergedInputChannelStateHandle :
>  
> {code:java}
> {   // MergedInputChannelStateHandle
>     "delegate": {
>         "filePath": 
> "viewfs://hadoop-meituan/flink-yg15/checkpoints/retained/1234567/ab8d0c2f02a47586490b15e7a2c30555/chk-31/ffe54c0a-9b6e-4724-aae7-61b96bf8b1cf",
>         "stateSize": 123456
>     },
>     "size": 2000,
>     "subtaskIndex":0,
>     "channels": [ // One InputChannel per element
>         {
>             "info": {
>                 "gateIdx": 0,
>                 "inputChannelIdx": 0
>             },
>             "offsets": [
>                 100,200,300,400
>             ],
>             "size": 1400
>         },
>         {
>             "info": {
>                 "gateIdx": 0,
>                 "inputChannelIdx": 1
>             },
>             "offsets": [
>                 500,600
>             ],
>             "size": 600
>         }
>     ]
> }
>  {code}
> MergedResultSubpartitionStateHandle is similar.
>  
>  
> WDYT [~roman] , [~pnowojski] , [~fanrui] ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33734) Merge unaligned checkpoint state handle

2024-02-17 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818246#comment-17818246
 ] 

Feifan Wang commented on FLINK-33734:
-

Hi [~fanrui] , I rerun the test job , and find some long "Async Duration". The 
checkpoint storage is HDFS, I think the bottleneck now should be long-tail 
latency.

!image-2024-02-18-15-12-20-665.png|width=860,height=200!

> Merge unaligned checkpoint state handle
> ---
>
> Key: FLINK-33734
> URL: https://issues.apache.org/jira/browse/FLINK-33734
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: Feifan Wang
>Assignee: Feifan Wang
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2024-02-18-15-12-20-665.png
>
>
> h3. Background
> Unaligned checkpoint will write the inflight-data of all InputChannel and 
> ResultSubpartition of the same subtask to the same file during checkpoint. 
> The InputChannelStateHandle and ResultSubpartitionStateHandle organize the 
> metadata of inflight-data at the channel granularity, which causes the file 
> name to be repeated many times. When a job is under backpressure and task 
> parallelism is high, the metadata of unaligned checkpoints will bloat. This 
> will result in:
>  # The amount of data reported by taskmanager to jobmanager increases, and 
> jobmanager takes longer to process these RPC requests.
>  # The metadata of the entire checkpoint becomes very large, and it takes 
> longer to serialize and write it to dfs.
> Both of the above points ultimately lead to longer checkpoint duration.
> h3. A Production example
> Take our production job with a parallelism of 4800 as an example:
>  # When there is no back pressure, checkpoint end-to-end duration is within 7 
> seconds.
>  # When under pressure: checkpoint end-to-end duration often exceeds 1 
> minute. We found that jobmanager took more than 40 seconds to process rpc 
> requests, and serialized metadata took more than 20 seconds.Some checkpoint 
> statistics:
> |metadata file size|950 MB|
> |channel state count|12,229,854|
> |channel file count|5536|
> Of the 950MB in the metadata file, 68% are redundant file paths.
> We enabled log-based checkpoint on this job and hoped that the checkpoint 
> could be completed within 30 seconds. This problem made it difficult to 
> achieve this goal.
> h3. Propose changes
> I suggest introducing MergedInputChannelStateHandle and 
> MergedResultSubpartitionStateHandle to eliminate redundant file paths.
> The taskmanager merges all InputChannelStateHandles with the same delegated 
> StreamStateHandle in the same subtask into one MergedInputChannelStateHandle 
> before reporting. When recovering from checkpoint, jobmangager converts 
> MergedInputChannelStateHandle to InputChannelStateHandle collection before 
> assigning state handle, and the rest of the process does not need to be 
> changed. 
> Structure of MergedInputChannelStateHandle :
>  
> {code:java}
> {   // MergedInputChannelStateHandle
>     "delegate": {
>         "filePath": 
> "viewfs://hadoop-meituan/flink-yg15/checkpoints/retained/1234567/ab8d0c2f02a47586490b15e7a2c30555/chk-31/ffe54c0a-9b6e-4724-aae7-61b96bf8b1cf",
>         "stateSize": 123456
>     },
>     "size": 2000,
>     "subtaskIndex":0,
>     "channels": [ // One InputChannel per element
>         {
>             "info": {
>                 "gateIdx": 0,
>                 "inputChannelIdx": 0
>             },
>             "offsets": [
>                 100,200,300,400
>             ],
>             "size": 1400
>         },
>         {
>             "info": {
>                 "gateIdx": 0,
>                 "inputChannelIdx": 1
>             },
>             "offsets": [
>                 500,600
>             ],
>             "size": 600
>         }
>     ]
> }
>  {code}
> MergedResultSubpartitionStateHandle is similar.
>  
>  
> WDYT [~roman] , [~pnowojski] , [~fanrui] ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33734) Merge unaligned checkpoint state handle

2024-02-17 Thread Feifan Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feifan Wang updated FLINK-33734:

Attachment: image-2024-02-18-15-12-20-665.png

> Merge unaligned checkpoint state handle
> ---
>
> Key: FLINK-33734
> URL: https://issues.apache.org/jira/browse/FLINK-33734
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: Feifan Wang
>Assignee: Feifan Wang
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2024-02-18-15-12-20-665.png
>
>
> h3. Background
> Unaligned checkpoint will write the inflight-data of all InputChannel and 
> ResultSubpartition of the same subtask to the same file during checkpoint. 
> The InputChannelStateHandle and ResultSubpartitionStateHandle organize the 
> metadata of inflight-data at the channel granularity, which causes the file 
> name to be repeated many times. When a job is under backpressure and task 
> parallelism is high, the metadata of unaligned checkpoints will bloat. This 
> will result in:
>  # The amount of data reported by taskmanager to jobmanager increases, and 
> jobmanager takes longer to process these RPC requests.
>  # The metadata of the entire checkpoint becomes very large, and it takes 
> longer to serialize and write it to dfs.
> Both of the above points ultimately lead to longer checkpoint duration.
> h3. A Production example
> Take our production job with a parallelism of 4800 as an example:
>  # When there is no back pressure, checkpoint end-to-end duration is within 7 
> seconds.
>  # When under pressure: checkpoint end-to-end duration often exceeds 1 
> minute. We found that jobmanager took more than 40 seconds to process rpc 
> requests, and serialized metadata took more than 20 seconds.Some checkpoint 
> statistics:
> |metadata file size|950 MB|
> |channel state count|12,229,854|
> |channel file count|5536|
> Of the 950MB in the metadata file, 68% are redundant file paths.
> We enabled log-based checkpoint on this job and hoped that the checkpoint 
> could be completed within 30 seconds. This problem made it difficult to 
> achieve this goal.
> h3. Propose changes
> I suggest introducing MergedInputChannelStateHandle and 
> MergedResultSubpartitionStateHandle to eliminate redundant file paths.
> The taskmanager merges all InputChannelStateHandles with the same delegated 
> StreamStateHandle in the same subtask into one MergedInputChannelStateHandle 
> before reporting. When recovering from checkpoint, jobmangager converts 
> MergedInputChannelStateHandle to InputChannelStateHandle collection before 
> assigning state handle, and the rest of the process does not need to be 
> changed. 
> Structure of MergedInputChannelStateHandle :
>  
> {code:java}
> {   // MergedInputChannelStateHandle
>     "delegate": {
>         "filePath": 
> "viewfs://hadoop-meituan/flink-yg15/checkpoints/retained/1234567/ab8d0c2f02a47586490b15e7a2c30555/chk-31/ffe54c0a-9b6e-4724-aae7-61b96bf8b1cf",
>         "stateSize": 123456
>     },
>     "size": 2000,
>     "subtaskIndex":0,
>     "channels": [ // One InputChannel per element
>         {
>             "info": {
>                 "gateIdx": 0,
>                 "inputChannelIdx": 0
>             },
>             "offsets": [
>                 100,200,300,400
>             ],
>             "size": 1400
>         },
>         {
>             "info": {
>                 "gateIdx": 0,
>                 "inputChannelIdx": 1
>             },
>             "offsets": [
>                 500,600
>             ],
>             "size": 600
>         }
>     ]
> }
>  {code}
> MergedResultSubpartitionStateHandle is similar.
>  
>  
> WDYT [~roman] , [~pnowojski] , [~fanrui] ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33734) Merge unaligned checkpoint state handle

2024-02-17 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818227#comment-17818227
 ] 

Feifan Wang commented on FLINK-33734:
-

Sorry to keep you waiting [~fanrui] . Checkpoint production consists of 
multiple stages, this PR only solves the problem of metadata expansion. There 
are other stages that may take too long, such as uploading files to HDFS.  As 
for the bottleneck after this PR, I didn't take the time to investigate and if 
necessary, I would take a look.

> Merge unaligned checkpoint state handle
> ---
>
> Key: FLINK-33734
> URL: https://issues.apache.org/jira/browse/FLINK-33734
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: Feifan Wang
>Assignee: Feifan Wang
>Priority: Major
>  Labels: pull-request-available
>
> h3. Background
> Unaligned checkpoint will write the inflight-data of all InputChannel and 
> ResultSubpartition of the same subtask to the same file during checkpoint. 
> The InputChannelStateHandle and ResultSubpartitionStateHandle organize the 
> metadata of inflight-data at the channel granularity, which causes the file 
> name to be repeated many times. When a job is under backpressure and task 
> parallelism is high, the metadata of unaligned checkpoints will bloat. This 
> will result in:
>  # The amount of data reported by taskmanager to jobmanager increases, and 
> jobmanager takes longer to process these RPC requests.
>  # The metadata of the entire checkpoint becomes very large, and it takes 
> longer to serialize and write it to dfs.
> Both of the above points ultimately lead to longer checkpoint duration.
> h3. A Production example
> Take our production job with a parallelism of 4800 as an example:
>  # When there is no back pressure, checkpoint end-to-end duration is within 7 
> seconds.
>  # When under pressure: checkpoint end-to-end duration often exceeds 1 
> minute. We found that jobmanager took more than 40 seconds to process rpc 
> requests, and serialized metadata took more than 20 seconds.Some checkpoint 
> statistics:
> |metadata file size|950 MB|
> |channel state count|12,229,854|
> |channel file count|5536|
> Of the 950MB in the metadata file, 68% are redundant file paths.
> We enabled log-based checkpoint on this job and hoped that the checkpoint 
> could be completed within 30 seconds. This problem made it difficult to 
> achieve this goal.
> h3. Propose changes
> I suggest introducing MergedInputChannelStateHandle and 
> MergedResultSubpartitionStateHandle to eliminate redundant file paths.
> The taskmanager merges all InputChannelStateHandles with the same delegated 
> StreamStateHandle in the same subtask into one MergedInputChannelStateHandle 
> before reporting. When recovering from checkpoint, jobmangager converts 
> MergedInputChannelStateHandle to InputChannelStateHandle collection before 
> assigning state handle, and the rest of the process does not need to be 
> changed. 
> Structure of MergedInputChannelStateHandle :
>  
> {code:java}
> {   // MergedInputChannelStateHandle
>     "delegate": {
>         "filePath": 
> "viewfs://hadoop-meituan/flink-yg15/checkpoints/retained/1234567/ab8d0c2f02a47586490b15e7a2c30555/chk-31/ffe54c0a-9b6e-4724-aae7-61b96bf8b1cf",
>         "stateSize": 123456
>     },
>     "size": 2000,
>     "subtaskIndex":0,
>     "channels": [ // One InputChannel per element
>         {
>             "info": {
>                 "gateIdx": 0,
>                 "inputChannelIdx": 0
>             },
>             "offsets": [
>                 100,200,300,400
>             ],
>             "size": 1400
>         },
>         {
>             "info": {
>                 "gateIdx": 0,
>                 "inputChannelIdx": 1
>             },
>             "offsets": [
>                 500,600
>             ],
>             "size": 600
>         }
>     ]
> }
>  {code}
> MergedResultSubpartitionStateHandle is similar.
>  
>  
> WDYT [~roman] , [~pnowojski] , [~fanrui] ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33734) Merge unaligned checkpoint state handle

2024-02-02 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17813646#comment-17813646
 ] 

Feifan Wang commented on FLINK-33734:
-

I found that only merging handles was not enough. Although the metadata was 
obviously smaller, it still took a long time for jobmanager to process rpc and 
serialize metadata. I speculate the reason is that although the merge handle 
avoids duplicate file paths, but the number of objects in the metadata is not 
significantly reduced (mainly channel infos). So I tried serializing the 
channel infos directly on the taskmanager side, and the test results showed 
that this worked well. Below are the results of my test:
{code:java}
# control group :
checkpoint time         (    s ) --- avg: 82.73                max: 155.88      
         min: 49.90
store metadata time     (    s ) --- avg: 23.45                max: 47.60       
         min: 9.61
metadata size           (   MB ) --- avg: 696.55               max: 954.98      
         min: 461.35
store metadata speed    ( MB/s ) --- avg: 32.41                max: 61.82       
         min: 18.29

# only merge handle :
checkpoint time         (    s ) --- avg: 66.22                max: 123.12      
         min: 38.68
store metadata time     (    s ) --- avg: 12.76                max: 26.18       
         min: 4.02
metadata size           (   MB ) --- avg: 269.14               max: 394.26      
         min: 159.86
store metadata speed    ( MB/s ) --- avg: 23.93                max: 46.55       
         min: 11.33

# not only merge handles, but also serialize channel infos on TaskMangager :
checkpoint time         (    s ) --- avg: 30.63                max: 74.27       
         min: 5.16
store metadata time     (    s ) --- avg: 0.87                 max: 11.23       
         min: 0.12
metadata size           (   MB ) --- avg: 232.22               max: 392.86      
         min: 45.34
store metadata speed    ( MB/s ) --- avg: 291.00               max: 386.80      
         min: 23.18{code}
Based on the results of the above test, I think serializing channel infos on 
the taskmanger side should be done together. I submitted a PR to implement this 
solution, please have a look [~pnowojski] ,[~fanrui] ,[~roman] , [~Zakelly] .

> Merge unaligned checkpoint state handle
> ---
>
> Key: FLINK-33734
> URL: https://issues.apache.org/jira/browse/FLINK-33734
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: Feifan Wang
>Assignee: Feifan Wang
>Priority: Major
>  Labels: pull-request-available
>
> h3. Background
> Unaligned checkpoint will write the inflight-data of all InputChannel and 
> ResultSubpartition of the same subtask to the same file during checkpoint. 
> The InputChannelStateHandle and ResultSubpartitionStateHandle organize the 
> metadata of inflight-data at the channel granularity, which causes the file 
> name to be repeated many times. When a job is under backpressure and task 
> parallelism is high, the metadata of unaligned checkpoints will bloat. This 
> will result in:
>  # The amount of data reported by taskmanager to jobmanager increases, and 
> jobmanager takes longer to process these RPC requests.
>  # The metadata of the entire checkpoint becomes very large, and it takes 
> longer to serialize and write it to dfs.
> Both of the above points ultimately lead to longer checkpoint duration.
> h3. A Production example
> Take our production job with a parallelism of 4800 as an example:
>  # When there is no back pressure, checkpoint end-to-end duration is within 7 
> seconds.
>  # When under pressure: checkpoint end-to-end duration often exceeds 1 
> minute. We found that jobmanager took more than 40 seconds to process rpc 
> requests, and serialized metadata took more than 20 seconds.Some checkpoint 
> statistics:
> |metadata file size|950 MB|
> |channel state count|12,229,854|
> |channel file count|5536|
> Of the 950MB in the metadata file, 68% are redundant file paths.
> We enabled log-based checkpoint on this job and hoped that the checkpoint 
> could be completed within 30 seconds. This problem made it difficult to 
> achieve this goal.
> h3. Propose changes
> I suggest introducing MergedInputChannelStateHandle and 
> MergedResultSubpartitionStateHandle to eliminate redundant file paths.
> The taskmanager merges all InputChannelStateHandles with the same delegated 
> StreamStateHandle in the same subtask into one MergedInputChannelStateHandle 
> before reporting. When recovering from checkpoint, jobmangager converts 
> MergedInputChannelStateHandle to InputChannelStateHandle collection before 
> assigning state handle, and the rest of the process does not need to be 
> changed. 
> Structure of MergedInputChannelStat

[jira] [Commented] (FLINK-33734) Merge unaligned checkpoint state handle

2023-12-10 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17795222#comment-17795222
 ] 

Feifan Wang commented on FLINK-33734:
-

Thanks [~roman] , merge handle in JM in MetadataSerializer can only reduce the 
size of metadata file, and dfs capacity tends not to be an issue. In the 
Production example I mentioned above, the main problem is that checkpointing 
takes too long. More specifically, we observed that it took more than 40 
seconds for the JM to process the checkpoint ack rpc and more than 20 seconds 
to serialize the metadata object. So I still think that handles should be 
merged in TM.
{quote}1. Does it make sense to also merge state from multiple subtasks (as 
implemented in FLINK-26803)?
{quote}
Yes, since multiple subtasks will reuse unaligned checkpoint files after the 
ISSUE is completed, merging handles between multiple subtasks can further 
reduce redundant data. But this may require changing the way the checkpoint 
metadata objects are organized. And this optimization is constant level, but 
merging handles within subtask can reduce the number of file paths from n^2 to 
n. So I'm not sure if merging handles between subtasks is worth it at this 
stage.
{quote}2. What happens when the delegate is in-memory state handle 
(`ByteStreamStateHandle`)?
{quote}
IIUC, ByteStreamStateHandle in each 
InputChannelStateHandle/ResultSubpartitionStateHandle is exclusive and uses a 
random UUID as the handle name. I just looked at this code and saw that 
FLINK-17972 was created while [~roman]  were writing this code. I think the 
MergedInputChannelStateHandle mentioned above is an implementation of 
FLINK-17972. As for ByteStreamStateHandle, we may need to change channel 
granular splitting to subtask granular splitting. WDYT [~roman] ?

> Merge unaligned checkpoint state handle
> ---
>
> Key: FLINK-33734
> URL: https://issues.apache.org/jira/browse/FLINK-33734
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: Feifan Wang
>Assignee: Feifan Wang
>Priority: Major
>
> h3. Background
> Unaligned checkpoint will write the inflight-data of all InputChannel and 
> ResultSubpartition of the same subtask to the same file during checkpoint. 
> The InputChannelStateHandle and ResultSubpartitionStateHandle organize the 
> metadata of inflight-data at the channel granularity, which causes the file 
> name to be repeated many times. When a job is under backpressure and task 
> parallelism is high, the metadata of unaligned checkpoints will bloat. This 
> will result in:
>  # The amount of data reported by taskmanager to jobmanager increases, and 
> jobmanager takes longer to process these RPC requests.
>  # The metadata of the entire checkpoint becomes very large, and it takes 
> longer to serialize and write it to dfs.
> Both of the above points ultimately lead to longer checkpoint duration.
> h3. A Production example
> Take our production job with a parallelism of 4800 as an example:
>  # When there is no back pressure, checkpoint end-to-end duration is within 7 
> seconds.
>  # When under pressure: checkpoint end-to-end duration often exceeds 1 
> minute. We found that jobmanager took more than 40 seconds to process rpc 
> requests, and serialized metadata took more than 20 seconds.Some checkpoint 
> statistics:
> |metadata file size|950 MB|
> |channel state count|12,229,854|
> |channel file count|5536|
> Of the 950MB in the metadata file, 68% are redundant file paths.
> We enabled log-based checkpoint on this job and hoped that the checkpoint 
> could be completed within 30 seconds. This problem made it difficult to 
> achieve this goal.
> h3. Propose changes
> I suggest introducing MergedInputChannelStateHandle and 
> MergedResultSubpartitionStateHandle to eliminate redundant file paths.
> The taskmanager merges all InputChannelStateHandles with the same delegated 
> StreamStateHandle in the same subtask into one MergedInputChannelStateHandle 
> before reporting. When recovering from checkpoint, jobmangager converts 
> MergedInputChannelStateHandle to InputChannelStateHandle collection before 
> assigning state handle, and the rest of the process does not need to be 
> changed. 
> Structure of MergedInputChannelStateHandle :
>  
> {code:java}
> {   // MergedInputChannelStateHandle
>     "delegate": {
>         "filePath": 
> "viewfs://hadoop-meituan/flink-yg15/checkpoints/retained/1234567/ab8d0c2f02a47586490b15e7a2c30555/chk-31/ffe54c0a-9b6e-4724-aae7-61b96bf8b1cf",
>         "stateSize": 123456
>     },
>     "size": 2000,
>     "subtaskIndex":0,
>     "channels": [ // One InputChannel per element
>         {
>             "info": {
>                 "gateIdx": 0,
>                 "inputChannelIdx": 0
>             }

[jira] [Updated] (FLINK-17972) Consider restructuring channel state

2023-12-10 Thread Feifan Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feifan Wang updated FLINK-17972:

Description: 
(depends on rescaling for unaligned checkpoints (FLINK-17979))

 

Current structure is the following (this PR doesn't change it):
{code:java}
Each subtask reports to JM TaskStateSnapshot
  each with zero ore more OperatorSubtaskState,
each with zero or more InputChannelStateHandle and 
ResultSubpartitionStateHandle
  each referencing an underlying StreamStateHandle
{code}
The underlying {{StreamStateHandle}} duplicates filename 
({{{}ByteStreamStateHandle{}}} has it too at least because of 
{{equals/hashcode}} I guess).

An alternative would be something like
{code:java}
Each subtask reports to JM TaskStateSnapshot
  each with zero ore more OperatorSubtaskState
each with zero or one StreamStateHandle (for channel state)
each with zero or more InputChannelStateHandle and 
ResultSubpartitionStateHandle{code}
{{{}(p{}}}{{{}robably, with StreamStateHandle{}}}{{ and InputChannelStateHandle 
and ResultSubpartitionStateHandle}}{{ encapsulated)}}

 

It would be more effective (less data duplication) but probably also more 
error-prone (implicit structure), less flexible (re-scaling).

(as discussed during introduction of {{StreamStateHandle.asBytesIfInMemory}} 
[here|https://github.com/apache/flink/pull/12292#discussion_r429925802])
 

  was:
(depends on rescaling for unaligned checkpoints (FLINK-17979))

 

Current structure is the following (this PR doesn't change it):
{code:java}
Each subtask reports to JM TaskStateSnapshot
  each with zero ore more OperatorSubtaskState,
each with zero or more InputChannelStateHandle and 
ResultSubpartitionStateHandle
  each referencing an underlying StreamStateHandle
{code}
The underlying {{StreamStateHandle}} duplicates filename 
({{ByteStreamStateHandle}} has it too at least because of {{equals/hashcode}} I 
guess).

An alternative would be something like
{code:java}
Each subtask reports to JM TaskStateSnapshot
  each with zero ore more OperatorSubtaskState
each with zero or one StreamStateHandle (for channel state)
each with zero or more InputChannelStateHandle and 
ResultSubpartitionStateHandle{code}
{{(p}}{{robably, with StreamStateHandle}}{{ and InputChannelStateHandle and 
ResultSubpartitionStateHandle}}{{ encapsulated)}}

 

It would be more effective (less data duplication) but probably also more 
error-prone (implicit structure), less flexible (re-scaling).

(as discussed during introduction of {{StreamStateHandle.asBytesIfInMemory}} 
[here|https://github.com/apache/flink/pull/12292#discussion_r429925802])


> Consider restructuring channel state
> 
>
> Key: FLINK-17972
> URL: https://issues.apache.org/jira/browse/FLINK-17972
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.0
>Reporter: Roman Khachatryan
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> auto-unassigned
>
> (depends on rescaling for unaligned checkpoints (FLINK-17979))
>  
> Current structure is the following (this PR doesn't change it):
> {code:java}
> Each subtask reports to JM TaskStateSnapshot
>   each with zero ore more OperatorSubtaskState,
> each with zero or more InputChannelStateHandle and 
> ResultSubpartitionStateHandle
>   each referencing an underlying StreamStateHandle
> {code}
> The underlying {{StreamStateHandle}} duplicates filename 
> ({{{}ByteStreamStateHandle{}}} has it too at least because of 
> {{equals/hashcode}} I guess).
> An alternative would be something like
> {code:java}
> Each subtask reports to JM TaskStateSnapshot
>   each with zero ore more OperatorSubtaskState
> each with zero or one StreamStateHandle (for channel state)
> each with zero or more InputChannelStateHandle and 
> ResultSubpartitionStateHandle{code}
> {{{}(p{}}}{{{}robably, with StreamStateHandle{}}}{{ and 
> InputChannelStateHandle and ResultSubpartitionStateHandle}}{{ encapsulated)}}
>  
> It would be more effective (less data duplication) but probably also more 
> error-prone (implicit structure), less flexible (re-scaling).
> (as discussed during introduction of {{StreamStateHandle.asBytesIfInMemory}} 
> [here|https://github.com/apache/flink/pull/12292#discussion_r429925802])
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33734) Merge unaligned checkpoint state handle

2023-12-07 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17794146#comment-17794146
 ] 

Feifan Wang commented on FLINK-33734:
-

We do also want to improve the speed of recovery, but currently checkpointing 
time is indeed the most important, so I completely agree with completing it in 
two steps.

 

When checking the source code, I found that the types of inputChannelState and 
resultSubpartitionState in OperatorSubtaskState are concrete classes instead of 
interfaces. And the serialization of these two handles does not use the handle 
type flag like other handles. This means that in order to introduce 
MergedInputChannelStateHandle , we may need to introduce MetadataV5Serializer . 
What do you think? Or do you have any other suggestions? [~pnowojski] 

> Merge unaligned checkpoint state handle
> ---
>
> Key: FLINK-33734
> URL: https://issues.apache.org/jira/browse/FLINK-33734
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: Feifan Wang
>Assignee: Feifan Wang
>Priority: Major
>
> h3. Background
> Unaligned checkpoint will write the inflight-data of all InputChannel and 
> ResultSubpartition of the same subtask to the same file during checkpoint. 
> The InputChannelStateHandle and ResultSubpartitionStateHandle organize the 
> metadata of inflight-data at the channel granularity, which causes the file 
> name to be repeated many times. When a job is under backpressure and task 
> parallelism is high, the metadata of unaligned checkpoints will bloat. This 
> will result in:
>  # The amount of data reported by taskmanager to jobmanager increases, and 
> jobmanager takes longer to process these RPC requests.
>  # The metadata of the entire checkpoint becomes very large, and it takes 
> longer to serialize and write it to dfs.
> Both of the above points ultimately lead to longer checkpoint duration.
> h3. A Production example
> Take our production job with a parallelism of 4800 as an example:
>  # When there is no back pressure, checkpoint end-to-end duration is within 7 
> seconds.
>  # When under pressure: checkpoint end-to-end duration often exceeds 1 
> minute. We found that jobmanager took more than 40 seconds to process rpc 
> requests, and serialized metadata took more than 20 seconds.Some checkpoint 
> statistics:
> |metadata file size|950 MB|
> |channel state count|12,229,854|
> |channel file count|5536|
> Of the 950MB in the metadata file, 68% are redundant file paths.
> We enabled log-based checkpoint on this job and hoped that the checkpoint 
> could be completed within 30 seconds. This problem made it difficult to 
> achieve this goal.
> h3. Propose changes
> I suggest introducing MergedInputChannelStateHandle and 
> MergedResultSubpartitionStateHandle to eliminate redundant file paths.
> The taskmanager merges all InputChannelStateHandles with the same delegated 
> StreamStateHandle in the same subtask into one MergedInputChannelStateHandle 
> before reporting. When recovering from checkpoint, jobmangager converts 
> MergedInputChannelStateHandle to InputChannelStateHandle collection before 
> assigning state handle, and the rest of the process does not need to be 
> changed. 
> Structure of MergedInputChannelStateHandle :
>  
> {code:java}
> {   // MergedInputChannelStateHandle
>     "delegate": {
>         "filePath": 
> "viewfs://hadoop-meituan/flink-yg15/checkpoints/retained/1234567/ab8d0c2f02a47586490b15e7a2c30555/chk-31/ffe54c0a-9b6e-4724-aae7-61b96bf8b1cf",
>         "stateSize": 123456
>     },
>     "size": 2000,
>     "subtaskIndex":0,
>     "channels": [ // One InputChannel per element
>         {
>             "info": {
>                 "gateIdx": 0,
>                 "inputChannelIdx": 0
>             },
>             "offsets": [
>                 100,200,300,400
>             ],
>             "size": 1400
>         },
>         {
>             "info": {
>                 "gateIdx": 0,
>                 "inputChannelIdx": 1
>             },
>             "offsets": [
>                 500,600
>             ],
>             "size": 600
>         }
>     ]
> }
>  {code}
> MergedResultSubpartitionStateHandle is similar.
>  
>  
> WDYT [~roman] , [~pnowojski] , [~fanrui] ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33734) Merge unaligned checkpoint state handle

2023-12-06 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17793998#comment-17793998
 ] 

Feifan Wang commented on FLINK-33734:
-

Thanks to [~Zakelly] , [~pnowojski] and [~fanrui]  for participating in the 
discussion. 

 

to [~Zakelly] :

Yes, this proposal only aims to reduce the meta size of unaligned checkpoint. I 
also think that FLIP-306 does not solve the above problems. At the same time, I 
think my above proposal can work with FLIP-306.

 


to [~pnowojski] : 
{quote}That sending out the RPCs during recovery will take a long time? 
{quote}
Yes,in theory sending these rpc during recovery also takes a long time, but we 
have not paid attention to it before. First, because our job can accept a 
recovery time of several minutes from a business perspective. The second is 
that this kind of checkpoint only occurs during backpressure, and we have not 
tried to use this kind of checkpoint to restore the job.
{quote}Wouldn't it be better to keep the state handles merged during recovery 
until they reach their destined subtasks on TMs?
{quote}
I hold the same view with [~fanrui] on this issue. It is acceptable to me to 
solve the problems during checkpoint creation and recovery in two steps.

 

 

to [~fanrui] :
{quote}Can we think the _metadata file size will be reduced 68% after this 
proposal?
{quote}
Yes, but only for checkpoints where unaligned checkpoint handles account for 
the vast majority as mentioned above.
{quote}How does flink serialize the MergedInputChannelStateHandle? Does it 
store the field name? 
{quote}
The current serialization method of metadata objects is compact, and field 
names are not saved in the file. The serialization of each handle is hardcoded.

 

 

> Merge unaligned checkpoint state handle
> ---
>
> Key: FLINK-33734
> URL: https://issues.apache.org/jira/browse/FLINK-33734
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: Feifan Wang
>Assignee: Feifan Wang
>Priority: Major
>
> h3. Background
> Unaligned checkpoint will write the inflight-data of all InputChannel and 
> ResultSubpartition of the same subtask to the same file during checkpoint. 
> The InputChannelStateHandle and ResultSubpartitionStateHandle organize the 
> metadata of inflight-data at the channel granularity, which causes the file 
> name to be repeated many times. When a job is under backpressure and task 
> parallelism is high, the metadata of unaligned checkpoints will bloat. This 
> will result in:
>  # The amount of data reported by taskmanager to jobmanager increases, and 
> jobmanager takes longer to process these RPC requests.
>  # The metadata of the entire checkpoint becomes very large, and it takes 
> longer to serialize and write it to dfs.
> Both of the above points ultimately lead to longer checkpoint duration.
> h3. A Production example
> Take our production job with a parallelism of 4800 as an example:
>  # When there is no back pressure, checkpoint end-to-end duration is within 7 
> seconds.
>  # When under pressure: checkpoint end-to-end duration often exceeds 1 
> minute. We found that jobmanager took more than 40 seconds to process rpc 
> requests, and serialized metadata took more than 20 seconds.Some checkpoint 
> statistics:
> |metadata file size|950 MB|
> |channel state count|12,229,854|
> |channel file count|5536|
> Of the 950MB in the metadata file, 68% are redundant file paths.
> We enabled log-based checkpoint on this job and hoped that the checkpoint 
> could be completed within 30 seconds. This problem made it difficult to 
> achieve this goal.
> h3. Propose changes
> I suggest introducing MergedInputChannelStateHandle and 
> MergedResultSubpartitionStateHandle to eliminate redundant file paths.
> The taskmanager merges all InputChannelStateHandles with the same delegated 
> StreamStateHandle in the same subtask into one MergedInputChannelStateHandle 
> before reporting. When recovering from checkpoint, jobmangager converts 
> MergedInputChannelStateHandle to InputChannelStateHandle collection before 
> assigning state handle, and the rest of the process does not need to be 
> changed. 
> Structure of MergedInputChannelStateHandle :
>  
> {code:java}
> {   // MergedInputChannelStateHandle
>     "delegate": {
>         "filePath": 
> "viewfs://hadoop-meituan/flink-yg15/checkpoints/retained/1234567/ab8d0c2f02a47586490b15e7a2c30555/chk-31/ffe54c0a-9b6e-4724-aae7-61b96bf8b1cf",
>         "stateSize": 123456
>     },
>     "size": 2000,
>     "subtaskIndex":0,
>     "channels": [ // One InputChannel per element
>         {
>             "info": {
>                 "gateIdx": 0,
>                 "inputChannelIdx": 0
>             },
>             "offsets": [
>                 100,200,30

[jira] [Updated] (FLINK-33734) Merge unaligned checkpoint state handle

2023-12-04 Thread Feifan Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feifan Wang updated FLINK-33734:

Description: 
h3. Background

Unaligned checkpoint will write the inflight-data of all InputChannel and 
ResultSubpartition of the same subtask to the same file during checkpoint. The 
InputChannelStateHandle and ResultSubpartitionStateHandle organize the metadata 
of inflight-data at the channel granularity, which causes the file name to be 
repeated many times. When a job is under backpressure and task parallelism is 
high, the metadata of unaligned checkpoints will bloat. This will result in:
 # The amount of data reported by taskmanager to jobmanager increases, and 
jobmanager takes longer to process these RPC requests.
 # The metadata of the entire checkpoint becomes very large, and it takes 
longer to serialize and write it to dfs.

Both of the above points ultimately lead to longer checkpoint duration.
h3. A Production example

Take our production job with a parallelism of 4800 as an example:
 # When there is no back pressure, checkpoint end-to-end duration is within 7 
seconds.
 # When under pressure: checkpoint end-to-end duration often exceeds 1 minute. 
We found that jobmanager took more than 40 seconds to process rpc requests, and 
serialized metadata took more than 20 seconds.Some checkpoint statistics:
|metadata file size|950 MB|
|channel state count|12,229,854|
|channel file count|5536|

Of the 950MB in the metadata file, 68% are redundant file paths.

We enabled log-based checkpoint on this job and hoped that the checkpoint could 
be completed within 30 seconds. This problem made it difficult to achieve this 
goal.
h3. Propose changes

I suggest introducing MergedInputChannelStateHandle and 
MergedResultSubpartitionStateHandle to eliminate redundant file paths.

The taskmanager merges all InputChannelStateHandles with the same delegated 
StreamStateHandle in the same subtask into one MergedInputChannelStateHandle 
before reporting. When recovering from checkpoint, jobmangager converts 
MergedInputChannelStateHandle to InputChannelStateHandle collection before 
assigning state handle, and the rest of the process does not need to be 
changed. 

Structure of MergedInputChannelStateHandle :

 
{code:java}
{   // MergedInputChannelStateHandle
    "delegate": {
        "filePath": 
"viewfs://hadoop-meituan/flink-yg15/checkpoints/retained/1234567/ab8d0c2f02a47586490b15e7a2c30555/chk-31/ffe54c0a-9b6e-4724-aae7-61b96bf8b1cf",
        "stateSize": 123456
    },
    "size": 2000,
    "subtaskIndex":0,
    "channels": [ // One InputChannel per element
        {
            "info": {
                "gateIdx": 0,
                "inputChannelIdx": 0
            },
            "offsets": [
                100,200,300,400
            ],
            "size": 1400
        },
        {
            "info": {
                "gateIdx": 0,
                "inputChannelIdx": 1
            },
            "offsets": [
                500,600
            ],
            "size": 600
        }
    ]
}
 {code}
MergedResultSubpartitionStateHandle is similar.

 

 

WDYT [~roman] , [~pnowojski] , [~fanrui] ?

  was:
h3. Background

Unaligned checkpoint will write the inflight-data of all InputChannel and 
ResultSubpartition of the same subtask to the same file during checkpoint. The 
InputChannelStateHandle and ResultSubpartitionStateHandle organize the metadata 
of inflight-data at the channel granularity, which causes the file name to be 
repeated many times. When a job is under backpressure and task parallelism is 
high, the metadata of unaligned checkpoints will bloat. This will result in:
 # The amount of data reported by taskmanager to jobmanager increases, and 
jobmanager takes longer to process these RPC requests.
 # The metadata of the entire checkpoint becomes very large, and it takes 
longer to serialize and write it to dfs.

Both of the above points ultimately lead to longer checkpoint duration.
h3. A Production example

Take our production job with a parallelism of 4800 as an example:
 # When there is no back pressure, checkpoint end-to-end duration is within 7 
seconds.
 # When under pressure: checkpoint end-to-end duration often exceeds 1 minute. 
We found that jobmanager took more than 40 seconds to process rpc requests, and 
serialized metadata took more than 20 seconds.Some checkpoint statistics:
|metadata file size|950 MB|
|channel state count|12,229,854|
|channel file count|5536|
Of the 950MB in the metadata file, 68% are redundant file paths.

We enabled log-based checkpoint on this job and hoped that the checkpoint could 
be completed within 30 seconds. This problem made it difficult to achieve this 
goal.
h3. Propose changes

I suggest introducing MergedInputChannelStateHandle and 
MergedResultSubpartitionStateHandle to eliminate redundant file paths.

The taskmanager merges all InputChannelStateHandles

[jira] [Updated] (FLINK-33734) Merge unaligned checkpoint state handle

2023-12-04 Thread Feifan Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feifan Wang updated FLINK-33734:

Component/s: Runtime / Checkpointing
Description: 
h3. Background

Unaligned checkpoint will write the inflight-data of all InputChannel and 
ResultSubpartition of the same subtask to the same file during checkpoint. The 
InputChannelStateHandle and ResultSubpartitionStateHandle organize the metadata 
of inflight-data at the channel granularity, which causes the file name to be 
repeated many times. When a job is under backpressure and task parallelism is 
high, the metadata of unaligned checkpoints will bloat. This will result in:
 # The amount of data reported by taskmanager to jobmanager increases, and 
jobmanager takes longer to process these RPC requests.
 # The metadata of the entire checkpoint becomes very large, and it takes 
longer to serialize and write it to dfs.

Both of the above points ultimately lead to longer checkpoint duration.
h3. A Production example

Take our production job with a parallelism of 4800 as an example:
 # When there is no back pressure, checkpoint end-to-end duration is within 7 
seconds.
 # When under pressure: checkpoint end-to-end duration often exceeds 1 minute. 
We found that jobmanager took more than 40 seconds to process rpc requests, and 
serialized metadata took more than 20 seconds.Some checkpoint statistics:
|metadata file size|950 MB|
|channel state count|12,229,854|
|channel file count|5536|
Of the 950MB in the metadata file, 68% are redundant file paths.

We enabled log-based checkpoint on this job and hoped that the checkpoint could 
be completed within 30 seconds. This problem made it difficult to achieve this 
goal.
h3. Propose changes

I suggest introducing MergedInputChannelStateHandle and 
MergedResultSubpartitionStateHandle to eliminate redundant file paths.

The taskmanager merges all InputChannelStateHandles with the same delegated 
StreamStateHandle in the same subtask into one MergedInputChannelStateHandle 
before reporting. When recovering from checkpoint, jobmangager converts 
MergedInputChannelStateHandle to InputChannelStateHandle collection before 
assigning state handle, and the rest of the process does not need to be 
changed. 

Structure of MergedInputChannelStateHandle :

 
{code:java}
{   // MergedInputChannelStateHandle
    "delegate": {
        "filePath": 
"viewfs://hadoop-meituan/flink-yg15/checkpoints/retained/1361195/ab8d0c2f02a47586490b15e7a2c30555/chk-31/ffe54c0a-9b6e-4724-aae7-61b96bf8b1cf",
        "stateSize": 123456
    },
    "size": 2000,
    "subtaskIndex":0,
    "channels": [ // One InputChannel per element
        {
            "info": {
                "gateIdx": 0,
                "inputChannelIdx": 0
            },
            "offsets": [
                100,200,300,400
            ],
            "size": 1400
        },
        {
            "info": {
                "gateIdx": 0,
                "inputChannelIdx": 1
            },
            "offsets": [
                500,600
            ],
            "size": 600
        }
    ]
}
 {code}
MergedResultSubpartitionStateHandle is similar.

 

 

WDYT [~roman] , [~pnowojski] , [~fanrui] ?
Summary: Merge unaligned checkpoint state handle  (was: Merge unaligned 
checkpoint )

> Merge unaligned checkpoint state handle
> ---
>
> Key: FLINK-33734
> URL: https://issues.apache.org/jira/browse/FLINK-33734
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: Feifan Wang
>Priority: Major
>
> h3. Background
> Unaligned checkpoint will write the inflight-data of all InputChannel and 
> ResultSubpartition of the same subtask to the same file during checkpoint. 
> The InputChannelStateHandle and ResultSubpartitionStateHandle organize the 
> metadata of inflight-data at the channel granularity, which causes the file 
> name to be repeated many times. When a job is under backpressure and task 
> parallelism is high, the metadata of unaligned checkpoints will bloat. This 
> will result in:
>  # The amount of data reported by taskmanager to jobmanager increases, and 
> jobmanager takes longer to process these RPC requests.
>  # The metadata of the entire checkpoint becomes very large, and it takes 
> longer to serialize and write it to dfs.
> Both of the above points ultimately lead to longer checkpoint duration.
> h3. A Production example
> Take our production job with a parallelism of 4800 as an example:
>  # When there is no back pressure, checkpoint end-to-end duration is within 7 
> seconds.
>  # When under pressure: checkpoint end-to-end duration often exceeds 1 
> minute. We found that jobmanager took more than 40 seconds to process rpc 
> requests, and serialized metadata took more than 20 seconds.Some checkpoi

[jira] [Created] (FLINK-33734) Merge unaligned checkpoint

2023-12-03 Thread Feifan Wang (Jira)
Feifan Wang created FLINK-33734:
---

 Summary: Merge unaligned checkpoint 
 Key: FLINK-33734
 URL: https://issues.apache.org/jira/browse/FLINK-33734
 Project: Flink
  Issue Type: Improvement
Reporter: Feifan Wang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32901) Should set initial initialSequenceNumber of StateChangelogWriter after restore

2023-08-28 Thread Feifan Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feifan Wang closed FLINK-32901.
---
Resolution: Not A Problem

> Should set initial initialSequenceNumber of StateChangelogWriter after restore
> --
>
> Key: FLINK-32901
> URL: https://issues.apache.org/jira/browse/FLINK-32901
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: Feifan Wang
>Priority: Minor
>
> The current StateChangelogWriter lacks an interface to set the initial 
> sequence number, and FsStateChangelogWriter always sets the initial sequence 
> number to 0. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32901) Should set initial initialSequenceNumber of StateChangelogWriter after restore

2023-08-28 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17759531#comment-17759531
 ] 

Feifan Wang commented on FLINK-32901:
-

Sorry [~masteryhx] , before I thought that the sequence number starting from 0 
after restore was the cause of a bug, but later I found that it was caused by 
another reason 
([FLINK-32908|https://issues.apache.org/jira/browse/FLINK-32908]). Sorry for 
not closing this ticket in time.

> Should set initial initialSequenceNumber of StateChangelogWriter after restore
> --
>
> Key: FLINK-32901
> URL: https://issues.apache.org/jira/browse/FLINK-32901
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: Feifan Wang
>Priority: Minor
>
> The current StateChangelogWriter lacks an interface to set the initial 
> sequence number, and FsStateChangelogWriter always sets the initial sequence 
> number to 0. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32908) Fix wrong materialization id setting when switching from non-log-based checkpoint to log-based checkpoint

2023-08-22 Thread Feifan Wang (Jira)
Feifan Wang created FLINK-32908:
---

 Summary: Fix wrong materialization id setting when switching from 
non-log-based checkpoint to log-based checkpoint
 Key: FLINK-32908
 URL: https://issues.apache.org/jira/browse/FLINK-32908
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Reporter: Feifan Wang


Initial materialization ID should be set to checkpointID when switching from 
non-log-based checkpoint to log-based checkpoint. Currently initial id will be 
set to 0, which will cause the incremental snapshot of inner backend go wrong.

PTAL [~Yanfei Lei] , [~roman] .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32901) Should set initial initialSequenceNumber of StateChangelogWriter after restore

2023-08-21 Thread Feifan Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feifan Wang updated FLINK-32901:

Description: The current StateChangelogWriter lacks an interface to set the 
initial sequence number, and FsStateChangelogWriter always sets the initial 
sequence number to 0.   (was: The current StateChangelogWriter lacks an 
interface to set the initial sequence number, and FsStateChangelogWriter always 
sets the initial sequence number to 0. This causes the first materialization 
after recovery from the retained checkpoint to be delayed.)

> Should set initial initialSequenceNumber of StateChangelogWriter after restore
> --
>
> Key: FLINK-32901
> URL: https://issues.apache.org/jira/browse/FLINK-32901
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: Feifan Wang
>Priority: Minor
>
> The current StateChangelogWriter lacks an interface to set the initial 
> sequence number, and FsStateChangelogWriter always sets the initial sequence 
> number to 0. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32901) Should set initial initialSequenceNumber of StateChangelogWriter after restore

2023-08-21 Thread Feifan Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feifan Wang updated FLINK-32901:

Priority: Minor  (was: Critical)

> Should set initial initialSequenceNumber of StateChangelogWriter after restore
> --
>
> Key: FLINK-32901
> URL: https://issues.apache.org/jira/browse/FLINK-32901
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: Feifan Wang
>Priority: Minor
>
> The current StateChangelogWriter lacks an interface to set the initial 
> sequence number, and FsStateChangelogWriter always sets the initial sequence 
> number to 0. This causes the first materialization after recovery from the 
> retained checkpoint to be delayed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32901) Should set initial initialSequenceNumber of StateChangelogWriter after restore

2023-08-21 Thread Feifan Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feifan Wang updated FLINK-32901:

Component/s: Runtime / Checkpointing
 Runtime / State Backends
Description: The current StateChangelogWriter lacks an interface to set the 
initial sequence number, and FsStateChangelogWriter always sets the initial 
sequence number to 0. This causes the first materialization after recovery from 
the retained checkpoint to be delayed.
Summary: Should set initial initialSequenceNumber of 
StateChangelogWriter after restore  (was: Should set initial)

> Should set initial initialSequenceNumber of StateChangelogWriter after restore
> --
>
> Key: FLINK-32901
> URL: https://issues.apache.org/jira/browse/FLINK-32901
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: Feifan Wang
>Priority: Critical
>
> The current StateChangelogWriter lacks an interface to set the initial 
> sequence number, and FsStateChangelogWriter always sets the initial sequence 
> number to 0. This causes the first materialization after recovery from the 
> retained checkpoint to be delayed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32901) Should set initial

2023-08-21 Thread Feifan Wang (Jira)
Feifan Wang created FLINK-32901:
---

 Summary: Should set initial
 Key: FLINK-32901
 URL: https://issues.apache.org/jira/browse/FLINK-32901
 Project: Flink
  Issue Type: Bug
Reporter: Feifan Wang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30863) Register local recovery files of changelog before notifyCheckpointComplete()

2023-08-13 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17753869#comment-17753869
 ] 

Feifan Wang commented on FLINK-30863:
-

Since [~Yanfei Lei]  is still working on the ticket, I removed the 
'stale-assigned' tag.

> Register local recovery files of changelog before notifyCheckpointComplete()
> 
>
> Key: FLINK-30863
> URL: https://issues.apache.org/jira/browse/FLINK-30863
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.17.0
>Reporter: Yanfei Lei
>Assignee: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available
> Attachments: tm-log_fail_cl_local_recovery.txt
>
>
> If TM is materialized before receiving confirm(), the previously uploaded 
> queue in `FsStateChangelogWriter` will be cleared, so the local files of the 
> completed checkpoint will not be registered again, while the JM owned files 
> are registered before confirm(), and do not depend on the uploaded queue, so 
> the local files are deleted, and the DFS files are still there. 
>  
> We have encountered the following situation, the job cannot find the local 
> recovery files, but can restore from the DFS files:
> {code:java}
> 2023-01-18 17:21:13,412 [SlidingProcessingTimeWindows (37/48)#1] INFO  
> org.apache.flink.runtime.taskmanager.Task                    [] - 
> SlidingProcessingTimeWindows (37/48)#1 #1 (fa12cfa3b811a351e031b036b0e85d91) 
> switched from DEPLOYING to INITIALIZING.
> 2023-01-18 17:21:13,440 [SlidingProcessingTimeWindows (37/48)#1] INFO  
> org.apache.flink.runtime.state.TaskLocalStateStoreImpl       [] - Found 
> registered local state for checkpoint 11599 in subtask 
> (2daf1d9bc9ed40ecb191303db813b0de - 0a448493b4782967b150582570326227 - 36) : 
> TaskOperatorSubtaskStates{subtaskStatesByOperatorID={0a448493b4782967b150582570326227=SubtaskState{operatorStateFromBackend=StateObjectCollection{[]},
>  operatorStateFromStream=StateObjectCollection{[]}, 
> keyedStateFromBackend=StateObjectCollection{[org.apache.flink.runtime.state.changelog.ChangelogStateBackendLocalHandle@38aa46db]},
>  keyedStateFromStream=StateObjectCollection{[]}, 
> inputChannelState=StateObjectCollection{[]}, 
> resultSubpartitionState=StateObjectCollection{[]}, stateSize=1764644202, 
> checkpointedSize=1997682}}, isTaskDeployedAsFinished=false, 
> isTaskFinished=false}
> 2023-01-18 17:21:13,442 [SlidingProcessingTimeWindows (37/48)#1] INFO  
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - 
> Getting managed memory shared cache for RocksDB.
> 2023-01-18 17:21:13,446 [SlidingProcessingTimeWindows (37/48)#1] INFO  
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - 
> Obtained shared RocksDB cache of size 1438814063 bytes
> 2023-01-18 17:21:13,447 [SlidingProcessingTimeWindows (37/48)#1] INFO  
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation
>  [] - Starting to restore from state handle: 
> IncrementalLocalKeyedStateHandle{metaDataState=File State: 
> file:/opt/flink/flink-tmp-dir/tm_job-2daf1d9b-c9ed-40ec-b191-303db813b0de-taskmanager-1-31/localState/aid_45af7e6b612dad10b60554d81323d5f3/jid_2daf1d9bc9ed40ecb191303db813b0de/vtx_0a448493b4782967b150582570326227_sti_36/chk_125/0d082666-bd31-4ebe-9977-545c0d9b18a5
>  [1187 bytes]} 
> DirectoryKeyedStateHandle{directoryStateHandle=DirectoryStateHandle{directory=/opt/flink/flink-tmp-dir/tm_job-2daf1d9b-c9ed-40ec-b191-303db813b0de-taskmanager-1-31/localState/aid_45af7e6b612dad10b60554d81323d5f3/jid_2daf1d9bc9ed40ecb191303db813b0de/vtx_0a448493b4782967b150582570326227_sti_36/chk_125/b3e1d20f164d4c5baed291f5d1224183},
>  keyGroupRange=KeyGroupRange{startKeyGroup=96, endKeyGroup=98}} without 
> rescaling.
> 2023-01-18 17:21:13,495 [SlidingProcessingTimeWindows (37/48)#1] INFO  
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation
>  [] - Finished restoring from state handle: 
> IncrementalLocalKeyedStateHandle{metaDataState=File State: 
> file:/opt/flink/flink-tmp-dir/tm_job-2daf1d9b-c9ed-40ec-b191-303db813b0de-taskmanager-1-31/localState/aid_45af7e6b612dad10b60554d81323d5f3/jid_2daf1d9bc9ed40ecb191303db813b0de/vtx_0a448493b4782967b150582570326227_sti_36/chk_125/0d082666-bd31-4ebe-9977-545c0d9b18a5
>  [1187 bytes]} 
> DirectoryKeyedStateHandle{directoryStateHandle=DirectoryStateHandle{directory=/opt/flink/flink-tmp-dir/tm_job-2daf1d9b-c9ed-40ec-b191-303db813b0de-taskmanager-1-31/localState/aid_45af7e6b612dad10b60554d81323d5f3/jid_2daf1d9bc9ed40ecb191303db813b0de/vtx_0a448493b4782967b150582570326227_sti_36/chk_125/b3e1d20f164d4c5baed291f5d1224183},
>  keyGroupRange=KeyGroupRange{startKeyGroup=96, e

[jira] [Updated] (FLINK-30863) Register local recovery files of changelog before notifyCheckpointComplete()

2023-08-13 Thread Feifan Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feifan Wang updated FLINK-30863:

Labels: pull-request-available  (was: pull-request-available stale-assigned)

> Register local recovery files of changelog before notifyCheckpointComplete()
> 
>
> Key: FLINK-30863
> URL: https://issues.apache.org/jira/browse/FLINK-30863
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.17.0
>Reporter: Yanfei Lei
>Assignee: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available
> Attachments: tm-log_fail_cl_local_recovery.txt
>
>
> If TM is materialized before receiving confirm(), the previously uploaded 
> queue in `FsStateChangelogWriter` will be cleared, so the local files of the 
> completed checkpoint will not be registered again, while the JM owned files 
> are registered before confirm(), and do not depend on the uploaded queue, so 
> the local files are deleted, and the DFS files are still there. 
>  
> We have encountered the following situation, the job cannot find the local 
> recovery files, but can restore from the DFS files:
> {code:java}
> 2023-01-18 17:21:13,412 [SlidingProcessingTimeWindows (37/48)#1] INFO  
> org.apache.flink.runtime.taskmanager.Task                    [] - 
> SlidingProcessingTimeWindows (37/48)#1 #1 (fa12cfa3b811a351e031b036b0e85d91) 
> switched from DEPLOYING to INITIALIZING.
> 2023-01-18 17:21:13,440 [SlidingProcessingTimeWindows (37/48)#1] INFO  
> org.apache.flink.runtime.state.TaskLocalStateStoreImpl       [] - Found 
> registered local state for checkpoint 11599 in subtask 
> (2daf1d9bc9ed40ecb191303db813b0de - 0a448493b4782967b150582570326227 - 36) : 
> TaskOperatorSubtaskStates{subtaskStatesByOperatorID={0a448493b4782967b150582570326227=SubtaskState{operatorStateFromBackend=StateObjectCollection{[]},
>  operatorStateFromStream=StateObjectCollection{[]}, 
> keyedStateFromBackend=StateObjectCollection{[org.apache.flink.runtime.state.changelog.ChangelogStateBackendLocalHandle@38aa46db]},
>  keyedStateFromStream=StateObjectCollection{[]}, 
> inputChannelState=StateObjectCollection{[]}, 
> resultSubpartitionState=StateObjectCollection{[]}, stateSize=1764644202, 
> checkpointedSize=1997682}}, isTaskDeployedAsFinished=false, 
> isTaskFinished=false}
> 2023-01-18 17:21:13,442 [SlidingProcessingTimeWindows (37/48)#1] INFO  
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - 
> Getting managed memory shared cache for RocksDB.
> 2023-01-18 17:21:13,446 [SlidingProcessingTimeWindows (37/48)#1] INFO  
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - 
> Obtained shared RocksDB cache of size 1438814063 bytes
> 2023-01-18 17:21:13,447 [SlidingProcessingTimeWindows (37/48)#1] INFO  
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation
>  [] - Starting to restore from state handle: 
> IncrementalLocalKeyedStateHandle{metaDataState=File State: 
> file:/opt/flink/flink-tmp-dir/tm_job-2daf1d9b-c9ed-40ec-b191-303db813b0de-taskmanager-1-31/localState/aid_45af7e6b612dad10b60554d81323d5f3/jid_2daf1d9bc9ed40ecb191303db813b0de/vtx_0a448493b4782967b150582570326227_sti_36/chk_125/0d082666-bd31-4ebe-9977-545c0d9b18a5
>  [1187 bytes]} 
> DirectoryKeyedStateHandle{directoryStateHandle=DirectoryStateHandle{directory=/opt/flink/flink-tmp-dir/tm_job-2daf1d9b-c9ed-40ec-b191-303db813b0de-taskmanager-1-31/localState/aid_45af7e6b612dad10b60554d81323d5f3/jid_2daf1d9bc9ed40ecb191303db813b0de/vtx_0a448493b4782967b150582570326227_sti_36/chk_125/b3e1d20f164d4c5baed291f5d1224183},
>  keyGroupRange=KeyGroupRange{startKeyGroup=96, endKeyGroup=98}} without 
> rescaling.
> 2023-01-18 17:21:13,495 [SlidingProcessingTimeWindows (37/48)#1] INFO  
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation
>  [] - Finished restoring from state handle: 
> IncrementalLocalKeyedStateHandle{metaDataState=File State: 
> file:/opt/flink/flink-tmp-dir/tm_job-2daf1d9b-c9ed-40ec-b191-303db813b0de-taskmanager-1-31/localState/aid_45af7e6b612dad10b60554d81323d5f3/jid_2daf1d9bc9ed40ecb191303db813b0de/vtx_0a448493b4782967b150582570326227_sti_36/chk_125/0d082666-bd31-4ebe-9977-545c0d9b18a5
>  [1187 bytes]} 
> DirectoryKeyedStateHandle{directoryStateHandle=DirectoryStateHandle{directory=/opt/flink/flink-tmp-dir/tm_job-2daf1d9b-c9ed-40ec-b191-303db813b0de-taskmanager-1-31/localState/aid_45af7e6b612dad10b60554d81323d5f3/jid_2daf1d9bc9ed40ecb191303db813b0de/vtx_0a448493b4782967b150582570326227_sti_36/chk_125/b3e1d20f164d4c5baed291f5d1224183},
>  keyGroupRange=KeyGroupRange{startKeyGroup=96, endKeyGroup=98}} without 
> rescaling.
> 2023-01-18 17:21:13,495 [Sl

[jira] [Commented] (FLINK-31139) not upload empty state changelog file

2023-08-13 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17753725#comment-17753725
 ] 

Feifan Wang commented on FLINK-31139:
-

Hi [~roman] , is there any method to re-run  failed stages for 1.17 ?

> not upload empty state changelog file
> -
>
> Key: FLINK-31139
> URL: https://issues.apache.org/jira/browse/FLINK-31139
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.15.3, 1.16.1
>Reporter: Feifan Wang
>Assignee: Feifan Wang
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.16.3, 1.17.2
>
> Attachments: image-2023-02-20-19-51-34-397.png
>
>
> h1. Problem
> *_BatchingStateChangeUploadScheduler_* will upload many empty changelog files 
> (file size == 1  and only contains compressed flag).
> !image-2023-02-20-19-51-34-397.png|width=1062,height=188!
> These files are not referenced by any checkpoints, are not cleaned up, and 
> become more numerous as the job runs. Taking our big job as an example, 2292 
> such files were generated within 7 hours. It only takes about 4 months and 
> the number of files in the changelog directory will exceed a million.
> h1. Problem causes
> This problem is caused by *_BatchingStateChangeUploadScheduler#drainAndSave_* 
> not checking whether the task collection is empty. The data in the scheduled 
> queue may have been uploaded when the 
> _*BatchingStateChangeUploadScheduler#drainAndSave*_ method is executed.
>  
> So we should check whether the task collection is empty in 
> *_BatchingStateChangeUploadScheduler#drainAndSave_* . WDYT [~roman] , 
> [~Yanfei Lei] ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32769) PeriodicMaterializationManager pass descriptionFormat with invalid placeholder to MailboxExecutor#execute()

2023-08-07 Thread Feifan Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feifan Wang updated FLINK-32769:

Description: 
descriptionFormat in _MailboxExecutor#execute( ThrowingRunnable command, String descriptionFormat, Object... descriptionArgs)_ will 
be used in _String.format()_ which can't accept placeholder like "{}". But 
PeriodicMaterializationManager passed the descriptionFormat with invalid 
placeholder ‘{}’.

 

Hi [~ym] and [~Yanfei Lei] , PTAL.

  was:descriptionFormat in _MailboxExecutor#execute( ThrowingRunnable command, String descriptionFormat, Object... descriptionArgs)_ will 
be used in _String.format()_ which can't accept placeholder like "{}". But 
PeriodicMaterializationManager passed the descriptionFormat with invalid 
placeholder ‘{}’.


> PeriodicMaterializationManager pass descriptionFormat with invalid 
> placeholder to MailboxExecutor#execute()
> ---
>
> Key: FLINK-32769
> URL: https://issues.apache.org/jira/browse/FLINK-32769
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Feifan Wang
>Priority: Major
>  Labels: pull-request-available
>
> descriptionFormat in _MailboxExecutor#execute( ThrowingRunnable Exception> command, String descriptionFormat, Object... descriptionArgs)_ 
> will be used in _String.format()_ which can't accept placeholder like "{}". 
> But PeriodicMaterializationManager passed the descriptionFormat with invalid 
> placeholder ‘{}’.
>  
> Hi [~ym] and [~Yanfei Lei] , PTAL.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32769) PeriodicMaterializationManager pass descriptionFormat with invalid placeholder to MailboxExecutor#execute()

2023-08-07 Thread Feifan Wang (Jira)
Feifan Wang created FLINK-32769:
---

 Summary: PeriodicMaterializationManager pass descriptionFormat 
with invalid placeholder to MailboxExecutor#execute()
 Key: FLINK-32769
 URL: https://issues.apache.org/jira/browse/FLINK-32769
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Reporter: Feifan Wang


descriptionFormat in _MailboxExecutor#execute( ThrowingRunnable command, String descriptionFormat, Object... descriptionArgs)_ will 
be used in _String.format()_ which can't accept placeholder like "{}". But 
PeriodicMaterializationManager passed the descriptionFormat with invalid 
placeholder ‘{}’.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1

2023-08-05 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751304#comment-17751304
 ] 

Feifan Wang commented on FLINK-29913:
-

Hi [~roman] , In addition backport pr of 
[1.16|https://github.com/apache/flink/pull/23137] and 
[1.17|https://github.com/apache/flink/pull/23139], I also submitted [a PR to 
fix outdated java doc in 
SharedStateRegistry|https://github.com/apache/flink/pull/23147], PTAL.

> Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
> -
>
> Key: FLINK-29913
> URL: https://issues.apache.org/jira/browse/FLINK-29913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.16.0, 1.17.0
>Reporter: Yanfei Lei
>Assignee: Feifan Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0, 1.16.3, 1.17.2
>
>
> When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state 
> backend would be discarded by registering the same name handle. See 
> [https://github.com/apache/flink/pull/21050#discussion_r1011061072]
> cc [~roman] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25862) Refactor SharedStateRegistry to not limit StreamStateHandle to register/unregister

2023-08-05 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25862?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751299#comment-17751299
 ] 

Feifan Wang commented on FLINK-25862:
-

Hi [~yunta] , I'd be glad to do this job, can you assign this ticket to me ?
 

> Refactor SharedStateRegistry to not limit StreamStateHandle to 
> register/unregister
> --
>
> Key: FLINK-25862
> URL: https://issues.apache.org/jira/browse/FLINK-25862
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: Yun Tang
>Priority: Minor
> Fix For: 1.18.0
>
>
> Current implementation of SharedStateRegistry would use `StreamStateHandle` 
> to register and unregister. This would limit the usage for other componments, 
> such as change-log state backend handle usage.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32761) Use UUID based on PhysicalStateHandleID as SharedStateRegistryKey ChangelogStateHandleStreamImpl

2023-08-05 Thread Feifan Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32761?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feifan Wang updated FLINK-32761:

Description: 
_ChangelogStateHandleStreamImpl#getKey()_ use 
_System.identityHashCode(stateHandle)_ as _SharedStateRegistryKey_ while 
stateHandle is not _FileStateHandle_ or {_}ByteStreamStateHandle{_}. That can 
easily lead to collision, although from the current code path, it only affects 
the test code.

In FLINK-29913 , we use UUID based on PhysicalStateHandleID as 
SharedStateRegistryKey in IncrementalRemoteKeyedStateHandle, we can reuse this 
method in ChangelogStateHandleStreamImpl.

 

WDYT [~roman]  ?

  was:
_ChangelogStateHandleStreamImpl#getKey()_ use 
_System.identityHashCode(stateHandle)_ as _SharedStateRegistryKey_ while 
stateHandle is not _FileStateHandle_ or {_}ByteStreamStateHandle{_}. That can 
easily lead to collision, although from the current code path, it only affects 
the test code.

In FLINK-29913 , we use md5 sum of PhysicalStateHandleID as 
SharedStateRegistryKey in IncrementalRemoteKeyedStateHandle, we can reuse this 
method in ChangelogStateHandleStreamImpl.

 

WDYT [~roman]  ?


> Use UUID based on PhysicalStateHandleID as SharedStateRegistryKey 
> ChangelogStateHandleStreamImpl
> 
>
> Key: FLINK-32761
> URL: https://issues.apache.org/jira/browse/FLINK-32761
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Feifan Wang
>Priority: Major
>
> _ChangelogStateHandleStreamImpl#getKey()_ use 
> _System.identityHashCode(stateHandle)_ as _SharedStateRegistryKey_ while 
> stateHandle is not _FileStateHandle_ or {_}ByteStreamStateHandle{_}. That can 
> easily lead to collision, although from the current code path, it only 
> affects the test code.
> In FLINK-29913 , we use UUID based on PhysicalStateHandleID as 
> SharedStateRegistryKey in IncrementalRemoteKeyedStateHandle, we can reuse 
> this method in ChangelogStateHandleStreamImpl.
>  
> WDYT [~roman]  ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32761) Use UUID based on PhysicalStateHandleID as SharedStateRegistryKey ChangelogStateHandleStreamImpl

2023-08-05 Thread Feifan Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32761?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feifan Wang updated FLINK-32761:

Summary: Use UUID based on PhysicalStateHandleID as SharedStateRegistryKey 
ChangelogStateHandleStreamImpl  (was: Use md5 sum of PhysicalStateHandleID as 
SharedStateRegistryKey ChangelogStateHandleStreamImpl)

> Use UUID based on PhysicalStateHandleID as SharedStateRegistryKey 
> ChangelogStateHandleStreamImpl
> 
>
> Key: FLINK-32761
> URL: https://issues.apache.org/jira/browse/FLINK-32761
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Feifan Wang
>Priority: Major
>
> _ChangelogStateHandleStreamImpl#getKey()_ use 
> _System.identityHashCode(stateHandle)_ as _SharedStateRegistryKey_ while 
> stateHandle is not _FileStateHandle_ or {_}ByteStreamStateHandle{_}. That can 
> easily lead to collision, although from the current code path, it only 
> affects the test code.
> In FLINK-29913 , we use md5 sum of PhysicalStateHandleID as 
> SharedStateRegistryKey in IncrementalRemoteKeyedStateHandle, we can reuse 
> this method in ChangelogStateHandleStreamImpl.
>  
> WDYT [~roman]  ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32761) Use md5 sum of PhysicalStateHandleID as SharedStateRegistryKey ChangelogStateHandleStreamImpl

2023-08-05 Thread Feifan Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32761?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feifan Wang updated FLINK-32761:

Description: 
_ChangelogStateHandleStreamImpl#getKey()_ use 
_System.identityHashCode(stateHandle)_ as _SharedStateRegistryKey_ while 
stateHandle is not _FileStateHandle_ or {_}ByteStreamStateHandle{_}. That can 
easily lead to collision, although from the current code path, it only affects 
the test code.

In FLINK-29913 , we use md5 sum of PhysicalStateHandleID as 
SharedStateRegistryKey in IncrementalRemoteKeyedStateHandle, we can reuse this 
method in ChangelogStateHandleStreamImpl.

 

WDYT [~roman]  ?

  was:
_ChangelogStateHandleStreamImpl#getKey()_ use 
_System.identityHashCode(stateHandle)_ as _SharedStateRegistryKey_ while 
stateHandle is not _FileStateHandle_ or {_}ByteStreamStateHandle{_}. That can 
easily lead to collision, although from the current code path, it only affects 
the test code.

In FLINK-29913 , we use md5 sum of PhysicalStateHandleID as 
SharedStateRegistryKey in IncrementalRemoteKeyedStateHandle, we can reuse this 
method in ChangelogStateHandleStreamImpl.


> Use md5 sum of PhysicalStateHandleID as SharedStateRegistryKey 
> ChangelogStateHandleStreamImpl
> -
>
> Key: FLINK-32761
> URL: https://issues.apache.org/jira/browse/FLINK-32761
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Feifan Wang
>Priority: Major
>
> _ChangelogStateHandleStreamImpl#getKey()_ use 
> _System.identityHashCode(stateHandle)_ as _SharedStateRegistryKey_ while 
> stateHandle is not _FileStateHandle_ or {_}ByteStreamStateHandle{_}. That can 
> easily lead to collision, although from the current code path, it only 
> affects the test code.
> In FLINK-29913 , we use md5 sum of PhysicalStateHandleID as 
> SharedStateRegistryKey in IncrementalRemoteKeyedStateHandle, we can reuse 
> this method in ChangelogStateHandleStreamImpl.
>  
> WDYT [~roman]  ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32761) Use md5 sum of PhysicalStateHandleID as SharedStateRegistryKey ChangelogStateHandleStreamImpl

2023-08-05 Thread Feifan Wang (Jira)
Feifan Wang created FLINK-32761:
---

 Summary: Use md5 sum of PhysicalStateHandleID as 
SharedStateRegistryKey ChangelogStateHandleStreamImpl
 Key: FLINK-32761
 URL: https://issues.apache.org/jira/browse/FLINK-32761
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Reporter: Feifan Wang


_ChangelogStateHandleStreamImpl#getKey()_ use 
_System.identityHashCode(stateHandle)_ as _SharedStateRegistryKey_ while 
stateHandle is not _FileStateHandle_ or {_}ByteStreamStateHandle{_}. That can 
easily lead to collision, although from the current code path, it only affects 
the test code.

In FLINK-29913 , we use md5 sum of PhysicalStateHandleID as 
SharedStateRegistryKey in IncrementalRemoteKeyedStateHandle, we can reuse this 
method in ChangelogStateHandleStreamImpl.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1

2023-08-04 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1775#comment-1775
 ] 

Feifan Wang commented on FLINK-29913:
-

[~roman] ,  I submitted two PRs for 
[1.16|https://github.com/apache/flink/pull/23137] and 
[1.17|https://github.com/apache/flink/pull/23139], please take a look.

> Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
> -
>
> Key: FLINK-29913
> URL: https://issues.apache.org/jira/browse/FLINK-29913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.16.0, 1.17.0
>Reporter: Yanfei Lei
>Assignee: Feifan Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0, 1.16.3, 1.17.2
>
>
> When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state 
> backend would be discarded by registering the same name handle. See 
> [https://github.com/apache/flink/pull/21050#discussion_r1011061072]
> cc [~roman] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1

2023-08-03 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17750647#comment-17750647
 ] 

Feifan Wang commented on FLINK-29913:
-

 Thanks to [~roman]  for helping to review the PR, I learned a lot from it.
{quote}do you mind creating backport PRs for branches 1.17 and 1.16?
{quote}
I am glad to.

> Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
> -
>
> Key: FLINK-29913
> URL: https://issues.apache.org/jira/browse/FLINK-29913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.16.0, 1.17.0
>Reporter: Yanfei Lei
>Assignee: Feifan Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0, 1.16.3, 1.17.2
>
>
> When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state 
> backend would be discarded by registering the same name handle. See 
> [https://github.com/apache/flink/pull/21050#discussion_r1011061072]
> cc [~roman] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32681) RocksDBStateDownloaderTest.testMultiThreadCleanupOnFailure unstablie

2023-08-01 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17749541#comment-17749541
 ] 

Feifan Wang commented on FLINK-32681:
-

Sorry [~mapohl]  I didn't notice that the ticket has been assigned before 
submitting the PR.

> RocksDBStateDownloaderTest.testMultiThreadCleanupOnFailure unstablie
> 
>
> Key: FLINK-32681
> URL: https://issues.apache.org/jira/browse/FLINK-32681
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends, Tests
>Affects Versions: 1.18.0
>Reporter: Chesnay Schepler
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.18.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=51712&view=logs&j=77a9d8e1-d610-59b3-fc2a-4766541e0e33&t=125e07e7-8de0-5c6c-a541-a567415af3ef
> Failed 3 times in yesterdays nightly run.
> {code}
> Jul 26 01:12:46 01:12:46.889 [ERROR] 
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloaderTest.testMultiThreadCleanupOnFailure
>   Time elapsed: 0.044 s  <<< FAILURE!
> Jul 26 01:12:46 java.lang.AssertionError
> Jul 26 01:12:46   at org.junit.Assert.fail(Assert.java:87)
> Jul 26 01:12:46   at org.junit.Assert.assertTrue(Assert.java:42)
> Jul 26 01:12:46   at org.junit.Assert.assertFalse(Assert.java:65)
> Jul 26 01:12:46   at org.junit.Assert.assertFalse(Assert.java:75)
> Jul 26 01:12:46   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloaderTest.testMultiThreadCleanupOnFailure(RocksDBStateDownloaderTest.java:151)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32681) RocksDBStateDownloaderTest.testMultiThreadCleanupOnFailure unstablie

2023-08-01 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17749522#comment-17749522
 ] 

Feifan Wang edited comment on FLINK-32681 at 8/1/23 8:03 AM:
-

Hi [~srichter] , I'm also very interested in this problem. I think it is caused 
by the directory being created again by the download task that was already 
started. I think it can be solved with a cleaner that is aware of task failure, 
and I submitted a draft [PR|https://github.com/apache/flink/pull/23111] 
explaining the idea (perhaps as a final fix if you think it is appropriate). 
You can take a look if you don't mind. :)


was (Author: feifan wang):
Hi [~srichter] , I'm also very interested in this problem. I think it is caused 
by the directory being created again by the download task that was already 
started. I think it can be solved with a cleaner that is aware of task failure, 
and I submitted a draft PR explaining the idea (perhaps as a final fix if 
people feel it is appropriate). You can take a look if you don't mind.

> RocksDBStateDownloaderTest.testMultiThreadCleanupOnFailure unstablie
> 
>
> Key: FLINK-32681
> URL: https://issues.apache.org/jira/browse/FLINK-32681
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends, Tests
>Affects Versions: 1.18.0
>Reporter: Chesnay Schepler
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.18.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=51712&view=logs&j=77a9d8e1-d610-59b3-fc2a-4766541e0e33&t=125e07e7-8de0-5c6c-a541-a567415af3ef
> Failed 3 times in yesterdays nightly run.
> {code}
> Jul 26 01:12:46 01:12:46.889 [ERROR] 
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloaderTest.testMultiThreadCleanupOnFailure
>   Time elapsed: 0.044 s  <<< FAILURE!
> Jul 26 01:12:46 java.lang.AssertionError
> Jul 26 01:12:46   at org.junit.Assert.fail(Assert.java:87)
> Jul 26 01:12:46   at org.junit.Assert.assertTrue(Assert.java:42)
> Jul 26 01:12:46   at org.junit.Assert.assertFalse(Assert.java:65)
> Jul 26 01:12:46   at org.junit.Assert.assertFalse(Assert.java:75)
> Jul 26 01:12:46   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloaderTest.testMultiThreadCleanupOnFailure(RocksDBStateDownloaderTest.java:151)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32681) RocksDBStateDownloaderTest.testMultiThreadCleanupOnFailure unstablie

2023-08-01 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17749522#comment-17749522
 ] 

Feifan Wang commented on FLINK-32681:
-

Hi [~srichter] , I'm also very interested in this problem. I think it is caused 
by the directory being created again by the download task that was already 
started. I think it can be solved with a cleaner that is aware of task failure, 
and I submitted a draft PR explaining the idea (perhaps as a final fix if 
people feel it is appropriate). You can take a look if you don't mind.

> RocksDBStateDownloaderTest.testMultiThreadCleanupOnFailure unstablie
> 
>
> Key: FLINK-32681
> URL: https://issues.apache.org/jira/browse/FLINK-32681
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends, Tests
>Affects Versions: 1.18.0
>Reporter: Chesnay Schepler
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.18.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=51712&view=logs&j=77a9d8e1-d610-59b3-fc2a-4766541e0e33&t=125e07e7-8de0-5c6c-a541-a567415af3ef
> Failed 3 times in yesterdays nightly run.
> {code}
> Jul 26 01:12:46 01:12:46.889 [ERROR] 
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloaderTest.testMultiThreadCleanupOnFailure
>   Time elapsed: 0.044 s  <<< FAILURE!
> Jul 26 01:12:46 java.lang.AssertionError
> Jul 26 01:12:46   at org.junit.Assert.fail(Assert.java:87)
> Jul 26 01:12:46   at org.junit.Assert.assertTrue(Assert.java:42)
> Jul 26 01:12:46   at org.junit.Assert.assertFalse(Assert.java:65)
> Jul 26 01:12:46   at org.junit.Assert.assertFalse(Assert.java:75)
> Jul 26 01:12:46   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloaderTest.testMultiThreadCleanupOnFailure(RocksDBStateDownloaderTest.java:151)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25322) Support no-claim mode in changelog state backend

2023-07-27 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17747855#comment-17747855
 ] 

Feifan Wang commented on FLINK-25322:
-

{quote}One option is that checkpoints before state self-sustained will not be 
retained as retained checkpoints.
{quote}
On this basis, what if we do not execute notifyCheckpointCompleted for these 
not state self-sustained checkpoints ? These checkpoints are only used for 
restart and will retained after job exit, nor will they trigger the final 
commit of TwoPhaseCommitSink. WDYT [~pnowojski] ?
 

> Support no-claim mode in changelog state backend
> 
>
> Key: FLINK-25322
> URL: https://issues.apache.org/jira/browse/FLINK-25322
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: Dawid Wysakowicz
>Assignee: Feifan Wang
>Priority: Major
> Fix For: 1.18.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32664) TableSourceJsonPlanTest.testReuseSourceWithoutProjectionPushDown is failing

2023-07-24 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17746776#comment-17746776
 ] 

Feifan Wang commented on FLINK-32664:
-

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=51673&view=logs&j=0c940707-2659-5648-cbe6-a1ad63045f0a&t=075c2716-8010-5565-fe08-3c4bb45824a4&l=11250

> TableSourceJsonPlanTest.testReuseSourceWithoutProjectionPushDown is failing
> ---
>
> Key: FLINK-32664
> URL: https://issues.apache.org/jira/browse/FLINK-32664
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Blocker
>  Labels: pull-request-available, test-stability
>
> Blocker since it's failing on every build and reproduced locally
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=51661&view=logs&j=0c940707-2659-5648-cbe6-a1ad63045f0a&t=075c2716-8010-5565-fe08-3c4bb45824a4&l=11529



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25322) Support no-claim mode in changelog state backend

2023-07-24 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17746750#comment-17746750
 ] 

Feifan Wang commented on FLINK-25322:
-

Thanks for reply [~masteryhx] .
{quote}Users have to check the status of restore mode by the Flink UI or the 
REST API, right ?
{quote}
Yes, if users want to delete the restored non claimed checkpoint, he/she must 
check the flink ui or REST API to confirm the new job is state self-sustained. 
Otherwise, new jobs that are not state self-sustaining may fail to restart 
because of file not found. But I don't think this adds complexity to the user, 
because before that the user also has to check the Flink UI or REST UI to 
determine whether the new job has completed at least one checkpoint. In 
contrast, I think checking whether the job is state self-sustained is more 
intuitive.
{quote}If users stop the job before the 'slowest materilization of all 
subtasks', this behaves like LEGACY mode, otherwise this could behaves like 
NO_CLIAM mode, right ?
{quote}
In fact, I am also thinking about how to deal with retained checkpoints before 
the job reaches state self-sustained. One option is that checkpoints before 
state self-sustained will not be retained as retained checkpoints. But this 
will cause data duplication when the job using the transactional sink resumes 
from that restored checkpoint. Another option is to record in the checkpoint 
metadata which state artifacts are borrowed from the non-claimed checkpoint, 
and when the new checkpoint is used for claim mode recovery, those state 
artifacts borrowed from the non-claimed checkpoint will not be deleted. Do you 
have any thoughts on this issue [~pnowojski]  ?
{quote}Of course, IIUC, If users want to use NO_CLAIM mode, they'd like to 
retain a CP to let other jobs use.
{quote}
In fact, even if a user manually redeploys the same job after updating the 
business logic code, it is desirable to be able to use the no-claim mode. 
Because the no-claim mode can guarantee that the job can be rolled back when 
there is a problem with the new logic code.

> Support no-claim mode in changelog state backend
> 
>
> Key: FLINK-25322
> URL: https://issues.apache.org/jira/browse/FLINK-25322
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: Dawid Wysakowicz
>Assignee: Feifan Wang
>Priority: Major
> Fix For: 1.18.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25322) Support no-claim mode in changelog state backend

2023-07-06 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17740482#comment-17740482
 ] 

Feifan Wang commented on FLINK-25322:
-

Thanks for reply [~pnowojski] , I'd love to write a flip, but it seems I don't 
have permission to create flip pages, what should I do ?

> Support no-claim mode in changelog state backend
> 
>
> Key: FLINK-25322
> URL: https://issues.apache.org/jira/browse/FLINK-25322
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: Dawid Wysakowicz
>Assignee: Feifan Wang
>Priority: Major
> Fix For: 1.18.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25322) Support no-claim mode in changelog state backend

2023-07-02 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17739364#comment-17739364
 ] 

Feifan Wang commented on FLINK-25322:
-

Sorry for the late reply [~pnowojski] .
{quote}just use the native savepoint code path for this
{quote}
Native savepoints can only be triggered by users, and cannot be executed as 
quickly and frequently as log based checkpoints. Therefore, it can only be used 
in scenarios where the user actively restarts the job.
{quote}use claim mode for recovery/restarts
{quote}
Claim mode destroys the snapshots, which prevents users from starting multiple 
jobs from the same snapshot and rolling back jobs with old code (that is useful 
for some important job).
{quote}use/implement fast duplicating FS
{quote}
Fast duplicating is not always available, for example the very commonly used 
hadoop filesystem does not currently support fast copy (See 
[HDFS-2139|https://issues.apache.org/jira/browse/HDFS-2139]) . 
{quote}nobody has complained about no-claim mode not working for the changelog 
statebackend so far
{quote}
In fact, not supporting the no-claim mode is one of the obstacles to promoting 
log based checkpoint in our company. Our team provides flink-related technical 
support to other teams in our company that develop flink jobs. We do have some 
users who want to use both log based checkpoint and no-claim mode. For example, 
one of our users has many thousands of concurrent jobs. A single job is 
distributed on many machines due to the high number of concurrency, so that it 
often encounters job restarts caused by machine failures. Before using log 
based checkpoint, they can only set the checkpoint interval to 10 minutes, and 
if it is shorter, the checkpoint will become unstable. Each job restart caused 
roughly ten minutes of data to be reconsumed, which in turn caused tens of 
minutes of latency. We are trying to use log based checkpoint on these jobs and 
everything is fine except no-claim mode is not supported.
{quote}That would require a bigger discussion I think.
{quote}
I'd love to initiate a discussion like this, how do I go about it ?

> Support no-claim mode in changelog state backend
> 
>
> Key: FLINK-25322
> URL: https://issues.apache.org/jira/browse/FLINK-25322
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: Dawid Wysakowicz
>Assignee: Feifan Wang
>Priority: Major
> Fix For: 1.18.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32478) SourceCoordinatorAlignmentTest.testAnnounceCombinedWatermarkWithoutStart fails

2023-06-28 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17738373#comment-17738373
 ] 

Feifan Wang commented on FLINK-32478:
-

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50623&view=logs&j=0da23115-68bb-5dcd-192c-bd4c8adebde1&t=24c3384f-1bcb-57b3-224f-51bf973bbee8&l=8168

> SourceCoordinatorAlignmentTest.testAnnounceCombinedWatermarkWithoutStart fails
> --
>
> Key: FLINK-32478
> URL: https://issues.apache.org/jira/browse/FLINK-32478
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.18.0, 1.16.3, 1.17.2
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>
> SourceCoordinatorAlignmentTest.testAnnounceCombinedWatermarkWithoutStart fails
>  
> Root cause: multiple sources share the same thread pool, and the second 
> source cannot start due to the first source closes the shared thread pool.
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50611&view=logs&j=0da23115-68bb-5dcd-192c-bd4c8adebde1&t=24c3384f-1bcb-57b3-224f-51bf973bbee8&l=8613



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25322) Support no-claim mode in changelog state backend

2023-06-16 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17733588#comment-17733588
 ] 

Feifan Wang commented on FLINK-25322:
-

Hi [~ym]  , [~ym]  ,  I saw that this issue has not been progressed for a long 
time, I think it is very important to support the no-claim mode, and no-claim 
mode is the default mode is a proof. If you don't mind, can you assign this 
ticket to me ?

 

As for the implementation, I have some different ideas. The current 
implementation method of no-claim mode is to make full snapshot in the first 
checkpoint after the job recovers from the retained snapshot, but there will be 
some problems when the Log based checkpoint make full snapshot:
 # State changelog data cannot be re-uploaded to durable storage, so if we want 
to make a full snapshot, we must either copy the changelog file in the retained 
checkpoint, or force a full materialization at the first checkpoint. It’s okay 
if the  storage of changelog supports fast copy, but if it doesn’t, the time 
overhead of copying these changelog files will be high, causing checkpoint 
timeout.
 # If we choose to force a full materialization at the first checkpoint, we 
will need to upload a large amount of data, which will also easily cause the 
checkpoint  timeout. At the same time, the materialization operations that were 
originally executed on a staggered peak on each subtask will be executed 
simultaneously, which will increase the pressure of the checkpoint storage and 
lead to a longer materialization time.

The no-claim mode has two requirements: First, the new job cannot delete any 
files in the restored snapshot, otherwise cannot start multiple jobs from the 
same snapshot. Second, the new job's checkpoint cannot  reference the file of 
the restored checkpoint, because in this mode The ownership of the restored 
snapshot belongs to the user, and the user may delete the snapshot later. We 
can call the ability of new jobs to run completely free from restored snapshots 
as "{*}state self-sustained{*}". When we implement the no-claim mode by forcing 
the first checkpoint to be a full checkpoint, the job enters the 
state-sustained state after the first checkpoint is completed. Please note that 
new job are not state-sustained as soon as they enter the running state. *I 
think that in the no-claim mode, it is not necessary to enter the 
state-sustained state at the first checkpoint, but to enter the state-sustained 
state as soon as possible.* 

 

*Proposal:*

Based on the above explanation, I have the following proposal:

*1. ChangelogStateBackend only implements no-claim mode by forcing the first 
materialization to be a full materialization*
 # Since the changelog data before the materialization trigger is not required 
in the checkpoint after the materialization is completed, once all keyed state 
backends completed a fully materialization, the job no longer depends on the 
restored snapshot file.
 # Materialization is still performed asynchronously, so the longer 
materialization time caused by full upload will not cause checkpoint timeout.
 # In general, when log based checkpoint is enabled, the materialization 
interval is roughly the same as the checkpoint interval when log based 
checkpoint is not enabled. Therefore, the time for log based checkpoint to 
enter the state self-sustained state in this way will not be longer than that 
without state changelog.

*2. Show whether job is state-sustained in flink UI and provide a rest api to 
query for query that.*

Before this, the user needs to check whether the job has completed at least one 
checkpoint to determine whether the restored checkpoint can be deleted, which 
is not straightforward enough. I think a flag in flink UI and a dedicated rest 
api is better. SharedStateRegistry can be used to track whether the checkpoint 
refers to the restored no-claim snapshot, and then know whether the job is 
state-sustained. 

 

WDYT [~dwysakowicz], [~ym], [~pnowojski], [~roman] ?

> Support no-claim mode in changelog state backend
> 
>
> Key: FLINK-25322
> URL: https://issues.apache.org/jira/browse/FLINK-25322
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: Dawid Wysakowicz
>Assignee: Yuan Mei
>Priority: Major
> Fix For: 1.18.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (FLINK-25322) Support no-claim mode in changelog state backend

2023-06-16 Thread Feifan Wang (Jira)


[ https://issues.apache.org/jira/browse/FLINK-25322 ]


Feifan Wang deleted comment on FLINK-25322:
-

was (Author: feifan wang):
Hi [~dwysakowicz] , [~ym] ,  I saw that this issue has not been progressed for 
a long time, I think it is very important to support the no-claim mode, and 
no-claim mode is the default mode is a proof. If you don't mind, can you assign 
this ticket to me ?

;

> Support no-claim mode in changelog state backend
> 
>
> Key: FLINK-25322
> URL: https://issues.apache.org/jira/browse/FLINK-25322
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: Dawid Wysakowicz
>Assignee: Yuan Mei
>Priority: Major
> Fix For: 1.18.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25322) Support no-claim mode in changelog state backend

2023-06-16 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17733467#comment-17733467
 ] 

Feifan Wang commented on FLINK-25322:
-

Hi [~dwysakowicz] , [~ym] ,  I saw that this issue has not been progressed for 
a long time, I think it is very important to support the no-claim mode, and 
no-claim mode is the default mode is a proof. If you don't mind, can you assign 
this ticket to me ?

;

> Support no-claim mode in changelog state backend
> 
>
> Key: FLINK-25322
> URL: https://issues.apache.org/jira/browse/FLINK-25322
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: Dawid Wysakowicz
>Assignee: Yuan Mei
>Priority: Major
> Fix For: 1.18.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1

2023-05-28 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17726933#comment-17726933
 ] 

Feifan Wang commented on FLINK-29913:
-

Thanks for your information [~klion26] , I would like to add a case that I 
recently thought of a possible problem: Since 1.17, user can trigger checkpoint 
with assigned checkpoint type (CONFIGURED,FULL,INCREMENTAL). If user trigger a 
FULL checkpoint by rest api, this forces re-upload of snapshot files that have 
already been uploaded. Using remote file path based register key is still valid 
in this case.

I have submitted a PR, please help to review it [~roman] ,[~klion26] .

> Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
> -
>
> Key: FLINK-29913
> URL: https://issues.apache.org/jira/browse/FLINK-29913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.16.0, 1.17.0
>Reporter: Yanfei Lei
>Assignee: Feifan Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.3, 1.17.2
>
>
> When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state 
> backend would be discarded by registering the same name handle. See 
> [https://github.com/apache/flink/pull/21050#discussion_r1011061072]
> cc [~roman] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1

2023-05-23 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17725624#comment-17725624
 ] 

Feifan Wang commented on FLINK-29913:
-

Thanks [~roman] , I will prepare a pr.

> Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
> -
>
> Key: FLINK-29913
> URL: https://issues.apache.org/jira/browse/FLINK-29913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.16.0, 1.17.0
>Reporter: Yanfei Lei
>Assignee: Feifan Wang
>Priority: Major
> Fix For: 1.16.2, 1.17.2
>
>
> When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state 
> backend would be discarded by registering the same name handle. See 
> [https://github.com/apache/flink/pull/21050#discussion_r1011061072]
> cc [~roman] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1

2023-05-23 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17725470#comment-17725470
 ] 

Feifan Wang edited comment on FLINK-29913 at 5/23/23 3:45 PM:
--

Thanks for the clarification [~roman] ! 
{quote}Further, regarding the approach of using unique registry key, I agree 
with Congxian Qiu , we can just choose a stable register key generation method 
based on remote file name (such as use md5 digest of remote file name) , which 
can replace of 
IncrementalRemoteKeyedStateHandle#createSharedStateRegistryKeyFromFileName() .
The mapping of local sst file name to StreamStateHandle never changed , so the 
part of RocksDB recovery does not need to be changed.
{quote}
I mean we still use local file name as key of sharedState map in  
_*IncrementalRemoteKeyedStateHandle*_ and use remote file path when generating 
SharedStateRegisterKey. Changes in _*IncrementalRemoteKeyedStateHandle*_ like 
this :
{code:java}
...

private final Map sharedState;  // still use 
local file name as key of this map, corresponding to the “never change” I 
mentioned above

...


public void registerSharedStates(SharedStateRegistry stateRegistry, long 
checkpointID) {
...
for (Map.Entry sharedStateHandle :
sharedState.entrySet()) {

SharedStateRegistryKey registryKey = 
generateRegisterKey(sharedStateHandle.getValue);  // changed line


StreamStateHandle reference =
stateRegistry.registerReference(
registryKey, sharedStateHandle.getValue(), 
checkpointID);

sharedStateHandle.setValue(reference);
}
}

private static SharedStateRegistryKey generateRegisterKey(StreamStateHandle 
stateHandle) {
String keyString = null;
if (stateHandle instanceof FileStateHandle) {
keyString = ((FileStateHandle) stateHandle).getFilePath().toString();
} else if (stateHandle instanceof ByteStreamStateHandle) {
keyString = ((ByteStreamStateHandle) stateHandle).getHandleName();
} else {
keyString = Integer.toString(System.identityHashCode(stateHandle));
}
return new SharedStateRegistryKey(md5sum(keyString)); // may be other 
digest algorithm
}

{code}
 

And we can only use normal handles (not PlaceholderStreamStateHandle) in 
IncrementalRemoteKeyedStateHandle to make sure 
IncrementalRemoteKeyedStateHandle#generateRegisterKey() method never get a 
PlaceholderStreamStateHandle.


was (Author: feifan wang):
Thanks for the clarification [~roman] ! 
{quote}Further, regarding the approach of using unique registry key, I agree 
with Congxian Qiu , we can just choose a stable register key generation method 
based on remote file name (such as use md5 digest of remote file name) , which 
can replace of 
IncrementalRemoteKeyedStateHandle#createSharedStateRegistryKeyFromFileName() .
The mapping of local sst file name to StreamStateHandle never changed , so the 
part of RocksDB recovery does not need to be changed.
{quote}
I mean we still use local file name as key of sharedState map in  
_*IncrementalRemoteKeyedStateHandle*_ and use remote file path when generating 
SharedStateRegisterKey. Changes in _*IncrementalRemoteKeyedStateHandle*_ like 
this :
{code:java}
...

private final Map sharedState;  // still use 
local file name as key of this map, corresponding to the “never change” I 
mentioned above

...


public void registerSharedStates(SharedStateRegistry stateRegistry, long 
checkpointID) {
...
for (Map.Entry sharedStateHandle :
sharedState.entrySet()) {

SharedStateRegistryKey registryKey = 
generateRegisterKey(sharedStateHandle.getValue);  // changed line


StreamStateHandle reference =
stateRegistry.registerReference(
registryKey, sharedStateHandle.getValue(), 
checkpointID);

sharedStateHandle.setValue(reference);
}
}

private static SharedStateRegistryKey generateRegisterKey(StreamStateHandle 
stateHandle) {
String keyString = null;
if (stateHandle instanceof FileStateHandle) {
keyString = ((FileStateHandle) stateHandle).getFilePath().toString();
} else if (stateHandle instanceof ByteStreamStateHandle) {
keyString = ((ByteStreamStateHandle) stateHandle).getHandleName();
} else {
keyString = Integer.toString(System.identityHashCode(stateHandle));
}
return new SharedStateRegistryKey(md5sum(keyString)); // may be other 
digest algorithm
}

{code}

> Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
> -
>
> Key: FLINK-29913
> URL: https://issues.apache.org/jira/browse/FLINK-29913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.16.0, 1.17.0
> 

[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1

2023-05-23 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17725470#comment-17725470
 ] 

Feifan Wang commented on FLINK-29913:
-

Thanks for the clarification [~roman] ! 
{quote}Further, regarding the approach of using unique registry key, I agree 
with Congxian Qiu , we can just choose a stable register key generation method 
based on remote file name (such as use md5 digest of remote file name) , which 
can replace of 
IncrementalRemoteKeyedStateHandle#createSharedStateRegistryKeyFromFileName() .
The mapping of local sst file name to StreamStateHandle never changed , so the 
part of RocksDB recovery does not need to be changed.
{quote}
I mean we still use local file name as key of sharedState map in  
_*IncrementalRemoteKeyedStateHandle*_ and use remote file path when generating 
SharedStateRegisterKey. Changes in _*IncrementalRemoteKeyedStateHandle*_ like 
this :
{code:java}
...

private final Map sharedState;  // still use 
local file name as key of this map, corresponding to the “never change” I 
mentioned above

...


public void registerSharedStates(SharedStateRegistry stateRegistry, long 
checkpointID) {
...
for (Map.Entry sharedStateHandle :
sharedState.entrySet()) {

SharedStateRegistryKey registryKey = 
generateRegisterKey(sharedStateHandle.getValue);  // changed line


StreamStateHandle reference =
stateRegistry.registerReference(
registryKey, sharedStateHandle.getValue(), 
checkpointID);

sharedStateHandle.setValue(reference);
}
}

private static SharedStateRegistryKey generateRegisterKey(StreamStateHandle 
stateHandle) {
String keyString = null;
if (stateHandle instanceof FileStateHandle) {
keyString = ((FileStateHandle) stateHandle).getFilePath().toString();
} else if (stateHandle instanceof ByteStreamStateHandle) {
keyString = ((ByteStreamStateHandle) stateHandle).getHandleName();
} else {
keyString = Integer.toString(System.identityHashCode(stateHandle));
}
return new SharedStateRegistryKey(md5sum(keyString)); // may be other 
digest algorithm
}

{code}

> Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
> -
>
> Key: FLINK-29913
> URL: https://issues.apache.org/jira/browse/FLINK-29913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.16.0, 1.17.0
>Reporter: Yanfei Lei
>Assignee: Feifan Wang
>Priority: Major
> Fix For: 1.16.2, 1.17.2
>
>
> When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state 
> backend would be discarded by registering the same name handle. See 
> [https://github.com/apache/flink/pull/21050#discussion_r1011061072]
> cc [~roman] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1

2023-05-23 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17725387#comment-17725387
 ] 

Feifan Wang commented on FLINK-29913:
-

One overhead I can see is that it will use more memory for storing the next 
pointer. On a 64-bit system, about 7.63MB more memory will be used for every 
one million entries, I think it is acceptable. Is there any other runtime 
overhead I missed ?

As for the complexity, this approach will indeed increase the operation of the 
linked list in the _registerReference()_ method and _unregisterUnusedState()_ 
method. But given that this is easy to implement, and the implementation is 
cohesive, I think the complexity is acceptable.

 

Just to clarify, I think using a unique ID is also a valid approach, but I want 
learn how you do the selection. Further, regarding the approach of using unique 
registry key, I agree with [~klion26] , we can just choose a stable register 
key generation method based on remote file name (such as use md5 digest of 
remote file name) , which can replace of 
IncrementalRemoteKeyedStateHandle#createSharedStateRegistryKeyFromFileName() . 
The mapping of local sst file name to StreamStateHandle never changed , so the 
part of RocksDB recovery does not need to be changed.

 

Whichever approach will be chosen, I am happy to implement it. Can you assign 
this ticket to me  [~roman] ? looking forward to hearing from you.

> Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
> -
>
> Key: FLINK-29913
> URL: https://issues.apache.org/jira/browse/FLINK-29913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Yanfei Lei
>Priority: Minor
>
> When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state 
> backend would be discarded by registering the same name handle. See 
> [https://github.com/apache/flink/pull/21050#discussion_r1011061072]
> cc [~roman] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1

2023-05-23 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17725286#comment-17725286
 ] 

Feifan Wang commented on FLINK-29913:
-

Sorry I didn't notice this ticket earlier so I submitted a duplicate one.

[~Yanfei Lei]  for the priority, I agree with [~klion26] , since it may break 
the checkpoint in valid use case, the priority at least be *Major* .

[~roman] I think your proposal is valid, but I still want to provide an 
alternative :
 # Make SharedStateRegistry allow register multi state object to same key. The 
generation strategy for SharedStateRegistryKey is still determined by 
CompositeStateHandle , as it is now. Different state objects may be registered 
under the same key in force-full-checkpoint (active re-upload).
 # SharedStateRegistry maintains an linked entry list sorted by registration 
time for each key. PlaceHolderStreamStateHandle will be replace by the last one 
in the entry list.

In this approach, SharedStateRegistry don't discard any state object in 
registration process. SharedStateRegistry only deleted state object when its 
lastUsedCheckpoint is subsumed.

> Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
> -
>
> Key: FLINK-29913
> URL: https://issues.apache.org/jira/browse/FLINK-29913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Yanfei Lei
>Priority: Minor
>
> When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state 
> backend would be discarded by registering the same name handle. See 
> [https://github.com/apache/flink/pull/21050#discussion_r1011061072]
> cc [~roman] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32130) previous checkpoint will be broke by the subsequent incremental checkpoint

2023-05-22 Thread Feifan Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32130?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feifan Wang closed FLINK-32130.
---
Resolution: Duplicate

duplicate to FLINK-29913

> previous checkpoint will be broke by the subsequent incremental checkpoint
> --
>
> Key: FLINK-32130
> URL: https://issues.apache.org/jira/browse/FLINK-32130
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Reporter: Feifan Wang
>Priority: Major
>
> Currently, _SharedStateRegistryImpl_ will discard old one while register new 
> state to same key:
> {code:java}
> // Old entry is not in a confirmed checkpoint yet, and the new one differs.
> // This might result from (omitted KG range here for simplicity):
> // 1. Flink recovers from a failure using a checkpoint 1
> // 2. State Backend is initialized to UID xyz and a set of SST: { 01.sst }
> // 3. JM triggers checkpoint 2
> // 4. TM sends handle: "xyz-002.sst"; JM registers it under "xyz-002.sst"
> // 5. TM crashes; everything is repeated from (2)
> // 6. TM recovers from CP 1 again: backend UID "xyz", SST { 01.sst }
> // 7. JM triggers checkpoint 3
> // 8. TM sends NEW state "xyz-002.sst"
> // 9. JM discards it as duplicate
> // 10. checkpoint completes, but a wrong SST file is used
> // So we use a new entry and discard the old one:
> LOG.info(
> "Duplicated registration under key {} of a new state: {}. "
> + "This might happen during the task failover if state 
> backend creates different states with the same key before and after the 
> failure. "
> + "Discarding the OLD state and keeping the NEW one which is 
> included into a completed checkpoint",
> registrationKey,
> newHandle);
> scheduledStateDeletion = entry.stateHandle;
> entry.stateHandle = newHandle; {code}
> But if _execution.checkpointing.max-concurrent-checkpoints_ > 1, the 
> following case will fail (take _RocksDBStateBackend_ as an example):
>  # cp1 trigger: 1.sst be uploaded to file-1, and register <1.sst,file-1>, cp1 
> reference file-1
>  # cp1 is not yet complete, cp2 trigger: 1.sst be uploaded to file-2, and try 
> register <1.sst,file-2>. SharedStateRegistry discard file-1
>  # cp1 completed and cp2 failed, but the cp1 is broken (file-1 has be deleted)
> I add a test to reproduce the problem ( 
> [pr-22606|https://github.com/apache/flink/pull/22606] ).
> I think we should allow register multi state object to same key, WDYT 
> [~pnowojski], [~roman]  ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32130) previous checkpoint will be broke by the subsequent incremental checkpoint

2023-05-22 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17725250#comment-17725250
 ] 

Feifan Wang commented on FLINK-32130:
-

Sorry [~roman] , I didn't notice FLINK-29913 earlier, this ticket is indeed the 
same issue as FLINK-29913. I will close this ticket and move discussion to 
FLINK-29913.

> previous checkpoint will be broke by the subsequent incremental checkpoint
> --
>
> Key: FLINK-32130
> URL: https://issues.apache.org/jira/browse/FLINK-32130
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Reporter: Feifan Wang
>Priority: Major
>
> Currently, _SharedStateRegistryImpl_ will discard old one while register new 
> state to same key:
> {code:java}
> // Old entry is not in a confirmed checkpoint yet, and the new one differs.
> // This might result from (omitted KG range here for simplicity):
> // 1. Flink recovers from a failure using a checkpoint 1
> // 2. State Backend is initialized to UID xyz and a set of SST: { 01.sst }
> // 3. JM triggers checkpoint 2
> // 4. TM sends handle: "xyz-002.sst"; JM registers it under "xyz-002.sst"
> // 5. TM crashes; everything is repeated from (2)
> // 6. TM recovers from CP 1 again: backend UID "xyz", SST { 01.sst }
> // 7. JM triggers checkpoint 3
> // 8. TM sends NEW state "xyz-002.sst"
> // 9. JM discards it as duplicate
> // 10. checkpoint completes, but a wrong SST file is used
> // So we use a new entry and discard the old one:
> LOG.info(
> "Duplicated registration under key {} of a new state: {}. "
> + "This might happen during the task failover if state 
> backend creates different states with the same key before and after the 
> failure. "
> + "Discarding the OLD state and keeping the NEW one which is 
> included into a completed checkpoint",
> registrationKey,
> newHandle);
> scheduledStateDeletion = entry.stateHandle;
> entry.stateHandle = newHandle; {code}
> But if _execution.checkpointing.max-concurrent-checkpoints_ > 1, the 
> following case will fail (take _RocksDBStateBackend_ as an example):
>  # cp1 trigger: 1.sst be uploaded to file-1, and register <1.sst,file-1>, cp1 
> reference file-1
>  # cp1 is not yet complete, cp2 trigger: 1.sst be uploaded to file-2, and try 
> register <1.sst,file-2>. SharedStateRegistry discard file-1
>  # cp1 completed and cp2 failed, but the cp1 is broken (file-1 has be deleted)
> I add a test to reproduce the problem ( 
> [pr-22606|https://github.com/apache/flink/pull/22606] ).
> I think we should allow register multi state object to same key, WDYT 
> [~pnowojski], [~roman]  ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32141) SharedStateRegistry print too much info log

2023-05-20 Thread Feifan Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feifan Wang updated FLINK-32141:

Description: 
FLINK-29095 added some log to SharedStateRegistry for trouble shooting. Among 
them, a info log be added when newHandle is equal to the registered one:

[https://github.com/apache/flink/blob/release-1.17.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java#L117]

!image-2023-05-21-00-26-20-026.png|width=775,height=126!

But this case cannot be considered as a potential bug, because 
FsStateChangelogStorage will directly use the FileStateHandle of the previous 
checkpoint instead of PlaceholderStreamStateHandle.

In our tests, JobManager printed so much of this log that useful information 
was overwhelmed.

So I suggest change this log level to trace, WDYT [~Yanfei Lei], [~klion26] ?

  was:
FLINK-29095 added some log to SharedStateRegistry for trouble shooting. Among 
them, a info log be added when newHandle is equal to the registered one:

[https://github.com/apache/flink/blob/release-1.17.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java#L117]

!image-2023-05-21-00-26-20-026.png|width=775,height=126!

But this case cannot be considered as a possible bug, because 
FsStateChangelogStorage will directly use the FileStateHandle of the previous 
checkpoint instead of PlaceholderStreamStateHandle.

In our tests, JobManager printed so much of this log that useful information 
was overwhelmed.

So I suggest change this log level to trace, WDYT [~Yanfei Lei], [~klion26] ?


> SharedStateRegistry print too much info log
> ---
>
> Key: FLINK-32141
> URL: https://issues.apache.org/jira/browse/FLINK-32141
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.17.0
>Reporter: Feifan Wang
>Priority: Major
> Fix For: 1.17.1
>
> Attachments: image-2023-05-21-00-26-20-026.png
>
>
> FLINK-29095 added some log to SharedStateRegistry for trouble shooting. Among 
> them, a info log be added when newHandle is equal to the registered one:
> [https://github.com/apache/flink/blob/release-1.17.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java#L117]
> !image-2023-05-21-00-26-20-026.png|width=775,height=126!
> But this case cannot be considered as a potential bug, because 
> FsStateChangelogStorage will directly use the FileStateHandle of the previous 
> checkpoint instead of PlaceholderStreamStateHandle.
> In our tests, JobManager printed so much of this log that useful information 
> was overwhelmed.
> So I suggest change this log level to trace, WDYT [~Yanfei Lei], [~klion26] ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32141) SharedStateRegistry print too much info log

2023-05-20 Thread Feifan Wang (Jira)
Feifan Wang created FLINK-32141:
---

 Summary: SharedStateRegistry print too much info log
 Key: FLINK-32141
 URL: https://issues.apache.org/jira/browse/FLINK-32141
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.17.0
Reporter: Feifan Wang
 Fix For: 1.17.1
 Attachments: image-2023-05-21-00-26-20-026.png

FLINK-29095 added some log to SharedStateRegistry for trouble shooting. Among 
them, a info log be added when newHandle is equal to the registered one:

[https://github.com/apache/flink/blob/release-1.17.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java#L117]

!image-2023-05-21-00-26-20-026.png|width=775,height=126!

But this case cannot be considered as a possible bug, because 
FsStateChangelogStorage will directly use the FileStateHandle of the previous 
checkpoint instead of PlaceholderStreamStateHandle.

In our tests, JobManager printed so much of this log that useful information 
was overwhelmed.

So I suggest change this log level to trace, WDYT [~Yanfei Lei], [~klion26] ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-31743) Avoid relocating the RocksDB's log failure when filename exceeds 255 characters

2023-05-19 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31743?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17724325#comment-17724325
 ] 

Feifan Wang edited comment on FLINK-31743 at 5/19/23 3:41 PM:
--

Thanks for reminding [~yunta] , I created a new ticket, can you assign it to me 
?
And I have submitted a [PR on 
FRocksDB|https://github.com/ververica/frocksdb/pull/66], can you help me review 
it ?


was (Author: feifan wang):
Thank for reminding, I created a new ticket, can you assign it to me ?
And I hive submitted a [PR on 
FRocksDB|https://github.com/ververica/frocksdb/pull/66], can you help me review 
it ?

> Avoid relocating the RocksDB's log failure when filename exceeds 255 
> characters
> ---
>
> Key: FLINK-31743
> URL: https://issues.apache.org/jira/browse/FLINK-31743
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.16.1, 1.15.4
>Reporter: jinghaihang
>Assignee: Feifan Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.2, 1.18.0, 1.17.1
>
>
> Since FLINK-24785 , the file name of the rocksdb LOG is generated by parsing 
> the db path, when the db path is long and the filename exceeds 255 
> characters, the creation of the file will fail, so the relevant rocksdb LOG 
> cannot be seen in the flink log dir.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-31743) Avoid relocating the RocksDB's log failure when filename exceeds 255 characters

2023-05-19 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31743?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17724325#comment-17724325
 ] 

Feifan Wang edited comment on FLINK-31743 at 5/19/23 3:40 PM:
--

Thank for reminding, I created a new ticket, can you assign it to me ?
And I hive submitted a [PR on 
FRocksDB|https://github.com/ververica/frocksdb/pull/66], can you help me review 
it ?


was (Author: feifan wang):
Thank for reminding, I created a new ticket, can you assign it to me ?
And I hive submitted a [PR on 
FRocksDB|https://github.com/ververica/frocksdb/pull/66], can you help me review 
it ?

> Avoid relocating the RocksDB's log failure when filename exceeds 255 
> characters
> ---
>
> Key: FLINK-31743
> URL: https://issues.apache.org/jira/browse/FLINK-31743
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.16.1, 1.15.4
>Reporter: jinghaihang
>Assignee: Feifan Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.2, 1.18.0, 1.17.1
>
>
> Since FLINK-24785 , the file name of the rocksdb LOG is generated by parsing 
> the db path, when the db path is long and the filename exceeds 255 
> characters, the creation of the file will fail, so the relevant rocksdb LOG 
> cannot be seen in the flink log dir.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31743) Avoid relocating the RocksDB's log failure when filename exceeds 255 characters

2023-05-19 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31743?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17724325#comment-17724325
 ] 

Feifan Wang commented on FLINK-31743:
-

Thank for reminding, I created a new ticket, can you assign it to me ?
And I hive submitted a [PR on 
FRocksDB|https://github.com/ververica/frocksdb/pull/66], can you help me review 
it ?

> Avoid relocating the RocksDB's log failure when filename exceeds 255 
> characters
> ---
>
> Key: FLINK-31743
> URL: https://issues.apache.org/jira/browse/FLINK-31743
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.16.1, 1.15.4
>Reporter: jinghaihang
>Assignee: Feifan Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.2, 1.18.0, 1.17.1
>
>
> Since FLINK-24785 , the file name of the rocksdb LOG is generated by parsing 
> the db path, when the db path is long and the filename exceeds 255 
> characters, the creation of the file will fail, so the relevant rocksdb LOG 
> cannot be seen in the flink log dir.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32135) FRocksDB fix log file create failed caused by file name too long

2023-05-19 Thread Feifan Wang (Jira)
Feifan Wang created FLINK-32135:
---

 Summary: FRocksDB fix log file create failed caused by file name 
too long
 Key: FLINK-32135
 URL: https://issues.apache.org/jira/browse/FLINK-32135
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Reporter: Feifan Wang


RocksDB use instance path as log file name when specifying log path, but if 
instance path is too long to exceed filesystem's limit, log file creation will 
fail.

We disable log relocating when RocksDB instance path is too long in 
FLINK-31743, but that's just a hotfix. This ticket proposal save this problem 
on FrocksDB.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32130) previous checkpoint will be broke by the subsequent incremental checkpoint

2023-05-19 Thread Feifan Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32130?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feifan Wang updated FLINK-32130:

Component/s: Runtime / Checkpointing

> previous checkpoint will be broke by the subsequent incremental checkpoint
> --
>
> Key: FLINK-32130
> URL: https://issues.apache.org/jira/browse/FLINK-32130
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Reporter: Feifan Wang
>Priority: Major
>
> Currently, _SharedStateRegistryImpl_ will discard old one while register new 
> state to same key:
> {code:java}
> // Old entry is not in a confirmed checkpoint yet, and the new one differs.
> // This might result from (omitted KG range here for simplicity):
> // 1. Flink recovers from a failure using a checkpoint 1
> // 2. State Backend is initialized to UID xyz and a set of SST: { 01.sst }
> // 3. JM triggers checkpoint 2
> // 4. TM sends handle: "xyz-002.sst"; JM registers it under "xyz-002.sst"
> // 5. TM crashes; everything is repeated from (2)
> // 6. TM recovers from CP 1 again: backend UID "xyz", SST { 01.sst }
> // 7. JM triggers checkpoint 3
> // 8. TM sends NEW state "xyz-002.sst"
> // 9. JM discards it as duplicate
> // 10. checkpoint completes, but a wrong SST file is used
> // So we use a new entry and discard the old one:
> LOG.info(
> "Duplicated registration under key {} of a new state: {}. "
> + "This might happen during the task failover if state 
> backend creates different states with the same key before and after the 
> failure. "
> + "Discarding the OLD state and keeping the NEW one which is 
> included into a completed checkpoint",
> registrationKey,
> newHandle);
> scheduledStateDeletion = entry.stateHandle;
> entry.stateHandle = newHandle; {code}
> But if _execution.checkpointing.max-concurrent-checkpoints_ > 1, the 
> following case will fail (take _RocksDBStateBackend_ as an example):
>  # cp1 trigger: 1.sst be uploaded to file-1, and register <1.sst,file-1>, cp1 
> reference file-1
>  # cp1 is not yet complete, cp2 trigger: 1.sst be uploaded to file-2, and try 
> register <1.sst,file-2>. SharedStateRegistry discard file-1
>  # cp1 completed and cp2 failed, but the cp1 is broken (file-1 has be deleted)
> I add a test to reproduce the problem ( 
> [pr-22606|https://github.com/apache/flink/pull/22606] ).
> I think we should allow register multi state object to same key, WDYT 
> [~pnowojski], [~roman]  ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32130) previous checkpoint will be broke by the subsequent incremental checkpoint

2023-05-18 Thread Feifan Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32130?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feifan Wang updated FLINK-32130:

Description: 
Currently, _SharedStateRegistryImpl_ will discard old one while register new 
state to same key:
{code:java}
// Old entry is not in a confirmed checkpoint yet, and the new one differs.
// This might result from (omitted KG range here for simplicity):
// 1. Flink recovers from a failure using a checkpoint 1
// 2. State Backend is initialized to UID xyz and a set of SST: { 01.sst }
// 3. JM triggers checkpoint 2
// 4. TM sends handle: "xyz-002.sst"; JM registers it under "xyz-002.sst"
// 5. TM crashes; everything is repeated from (2)
// 6. TM recovers from CP 1 again: backend UID "xyz", SST { 01.sst }
// 7. JM triggers checkpoint 3
// 8. TM sends NEW state "xyz-002.sst"
// 9. JM discards it as duplicate
// 10. checkpoint completes, but a wrong SST file is used
// So we use a new entry and discard the old one:
LOG.info(
"Duplicated registration under key {} of a new state: {}. "
+ "This might happen during the task failover if state backend 
creates different states with the same key before and after the failure. "
+ "Discarding the OLD state and keeping the NEW one which is 
included into a completed checkpoint",
registrationKey,
newHandle);
scheduledStateDeletion = entry.stateHandle;
entry.stateHandle = newHandle; {code}
But if _execution.checkpointing.max-concurrent-checkpoints_ > 1, the following 
case will fail (take _RocksDBStateBackend_ as an example):
 # cp1 trigger: 1.sst be uploaded to file-1, and register <1.sst,file-1>, cp1 
reference file-1
 # cp1 is not yet complete, cp2 trigger: 1.sst be uploaded to file-2, and try 
register <1.sst,file-2>. SharedStateRegistry discard file-1
 # cp1 completed and cp2 failed, but the cp1 is broken (file-1 has be deleted)

I add a test to reproduce the problem ( 
[pr-22606|https://github.com/apache/flink/pull/22606] ).

I think we should allow register multi state object to same key, WDYT 
[~pnowojski], [~roman]  ?

  was:
Currently, _SharedStateRegistryImpl_ will discard old one while register new 
state to same key:
{code:java}
// Old entry is not in a confirmed checkpoint yet, and the new one differs.
// This might result from (omitted KG range here for simplicity):
// 1. Flink recovers from a failure using a checkpoint 1
// 2. State Backend is initialized to UID xyz and a set of SST: { 01.sst }
// 3. JM triggers checkpoint 2
// 4. TM sends handle: "xyz-002.sst"; JM registers it under "xyz-002.sst"
// 5. TM crashes; everything is repeated from (2)
// 6. TM recovers from CP 1 again: backend UID "xyz", SST { 01.sst }
// 7. JM triggers checkpoint 3
// 8. TM sends NEW state "xyz-002.sst"
// 9. JM discards it as duplicate
// 10. checkpoint completes, but a wrong SST file is used
// So we use a new entry and discard the old one:
LOG.info(
"Duplicated registration under key {} of a new state: {}. "
+ "This might happen during the task failover if state backend 
creates different states with the same key before and after the failure. "
+ "Discarding the OLD state and keeping the NEW one which is 
included into a completed checkpoint",
registrationKey,
newHandle);
scheduledStateDeletion = entry.stateHandle;
entry.stateHandle = newHandle; {code}
But if _execution.checkpointing.max-concurrent-checkpoints_ > 1, the following 
case will fail (take _RocksDBStateBackend_ as an example):
 # cp1 trigger: 1.sst be uploaded to file-1, and register <1.sst,file-1>, cp1 
reference file-1
 # cp1 is not yet complete, cp2 trigger: 1.sst be uploaded to file-2, and try 
register <1.sst,file-2>. SharedStateRegistry discard file-1
 # cp1 completed and cp2 failed, but the cp1 is broken (file-1 has be deleted)

I think we should allow register multi state object to same key, WDYT 
[~pnowojski], [~roman]  ?


> previous checkpoint will be broke by the subsequent incremental checkpoint
> --
>
> Key: FLINK-32130
> URL: https://issues.apache.org/jira/browse/FLINK-32130
> Project: Flink
>  Issue Type: Bug
>Reporter: Feifan Wang
>Priority: Major
>
> Currently, _SharedStateRegistryImpl_ will discard old one while register new 
> state to same key:
> {code:java}
> // Old entry is not in a confirmed checkpoint yet, and the new one differs.
> // This might result from (omitted KG range here for simplicity):
> // 1. Flink recovers from a failure using a checkpoint 1
> // 2. State Backend is initialized to UID xyz and a set of SST: { 01.sst }
> // 3. JM triggers checkpoint 2
> // 4. TM sends handle: "xyz-002.sst"; JM registers it under "xyz-002.sst"
> // 5. TM crashes; everything is repeated from (2)
> // 6. TM recovers from CP 1 again: b

[jira] [Created] (FLINK-32130) previous checkpoint will be broke by the subsequent incremental checkpoint

2023-05-18 Thread Feifan Wang (Jira)
Feifan Wang created FLINK-32130:
---

 Summary: previous checkpoint will be broke by the subsequent 
incremental checkpoint
 Key: FLINK-32130
 URL: https://issues.apache.org/jira/browse/FLINK-32130
 Project: Flink
  Issue Type: Bug
Reporter: Feifan Wang


Currently, _SharedStateRegistryImpl_ will discard old one while register new 
state to same key:
{code:java}
// Old entry is not in a confirmed checkpoint yet, and the new one differs.
// This might result from (omitted KG range here for simplicity):
// 1. Flink recovers from a failure using a checkpoint 1
// 2. State Backend is initialized to UID xyz and a set of SST: { 01.sst }
// 3. JM triggers checkpoint 2
// 4. TM sends handle: "xyz-002.sst"; JM registers it under "xyz-002.sst"
// 5. TM crashes; everything is repeated from (2)
// 6. TM recovers from CP 1 again: backend UID "xyz", SST { 01.sst }
// 7. JM triggers checkpoint 3
// 8. TM sends NEW state "xyz-002.sst"
// 9. JM discards it as duplicate
// 10. checkpoint completes, but a wrong SST file is used
// So we use a new entry and discard the old one:
LOG.info(
"Duplicated registration under key {} of a new state: {}. "
+ "This might happen during the task failover if state backend 
creates different states with the same key before and after the failure. "
+ "Discarding the OLD state and keeping the NEW one which is 
included into a completed checkpoint",
registrationKey,
newHandle);
scheduledStateDeletion = entry.stateHandle;
entry.stateHandle = newHandle; {code}
But if _execution.checkpointing.max-concurrent-checkpoints_ > 1, the following 
case will fail (take _RocksDBStateBackend_ as an example):
 # cp1 trigger: 1.sst be uploaded to file-1, and register <1.sst,file-1>, cp1 
reference file-1
 # cp1 is not yet complete, cp2 trigger: 1.sst be uploaded to file-2, and try 
register <1.sst,file-2>. SharedStateRegistry discard file-1
 # cp1 completed and cp2 failed, but the cp1 is broken (file-1 has be deleted)

I think we should allow register multi state object to same key, WDYT 
[~pnowojski], [~roman]  ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31743) Avoid relocating the RocksDB's log failure when filename exceeds 255 characters

2023-05-08 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31743?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17720736#comment-17720736
 ] 

Feifan Wang commented on FLINK-31743:
-

Thanks [~yunta] , I submit a new 
[PR|https://github.com/apache/flink/pull/22545] to fix this in 1.17, PTAL.

> Avoid relocating the RocksDB's log failure when filename exceeds 255 
> characters
> ---
>
> Key: FLINK-31743
> URL: https://issues.apache.org/jira/browse/FLINK-31743
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.16.1, 1.15.4
>Reporter: jinghaihang
>Assignee: Feifan Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.2, 1.18.0, 1.17.1
>
>
> Since FLINK-24785 , the file name of the rocksdb LOG is generated by parsing 
> the db path, when the db path is long and the filename exceeds 255 
> characters, the creation of the file will fail, so the relevant rocksdb LOG 
> cannot be seen in the flink log dir.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31139) not upload empty state changelog file

2023-04-26 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17717003#comment-17717003
 ] 

Feifan Wang commented on FLINK-31139:
-

Thanks [~roman] , remember to re-run failed stages for 1.17.

> not upload empty state changelog file
> -
>
> Key: FLINK-31139
> URL: https://issues.apache.org/jira/browse/FLINK-31139
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.15.3, 1.16.1
>Reporter: Feifan Wang
>Assignee: Feifan Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.2, 1.17.1
>
> Attachments: image-2023-02-20-19-51-34-397.png
>
>
> h1. Problem
> *_BatchingStateChangeUploadScheduler_* will upload many empty changelog files 
> (file size == 1  and only contains compressed flag).
> !image-2023-02-20-19-51-34-397.png|width=1062,height=188!
> These files are not referenced by any checkpoints, are not cleaned up, and 
> become more numerous as the job runs. Taking our big job as an example, 2292 
> such files were generated within 7 hours. It only takes about 4 months and 
> the number of files in the changelog directory will exceed a million.
> h1. Problem causes
> This problem is caused by *_BatchingStateChangeUploadScheduler#drainAndSave_* 
> not checking whether the task collection is empty. The data in the scheduled 
> queue may have been uploaded when the 
> _*BatchingStateChangeUploadScheduler#drainAndSave*_ method is executed.
>  
> So we should check whether the task collection is empty in 
> *_BatchingStateChangeUploadScheduler#drainAndSave_* . WDYT [~roman] , 
> [~Yanfei Lei] ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31900) Fix some typo in java doc, comments and assertion message

2023-04-24 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17716063#comment-17716063
 ] 

Feifan Wang commented on FLINK-31900:
-

Thanks [~Weijie Guo] for review and merge the PR !

> Fix some typo in java doc, comments and assertion message
> -
>
> Key: FLINK-31900
> URL: https://issues.apache.org/jira/browse/FLINK-31900
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Feifan Wang
>Assignee: Feifan Wang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> As the title.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31900) Fix some typo in java doc, comments and assertion message

2023-04-23 Thread Feifan Wang (Jira)
Feifan Wang created FLINK-31900:
---

 Summary: Fix some typo in java doc, comments and assertion message
 Key: FLINK-31900
 URL: https://issues.apache.org/jira/browse/FLINK-31900
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Reporter: Feifan Wang


As the title.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30863) Register local recovery files of changelog before notifyCheckpointComplete()

2023-04-19 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17714392#comment-17714392
 ] 

Feifan Wang commented on FLINK-30863:
-

There are two known issues with dstl file cleanup:
 # local dstl files still in use be deleted (as described earlier in the 
current ticket)
 # remote dstl files cannot be cleaned up when checkpoint cannot be completed, 
even if materialization has completed

According to Tang Yun's reivew feedback, in order to avoid introducing 
half-baked solutions, I would like to initiate a discussion on dstl file 
cleanup. 

*Here is my proposal :*

divide the deletion of dstl files into two categories :
 # *delete by subsumed checkpoint*
 ## remote files : delete by JobManager , nothing to do with TaskManager (keep 
status quo)
 ## local files : delete by LocalChangelogRegistry. Local checkpoint only used 
for job failover, so only retain one completed checkpoint. I suggest : Register 
the latest checkpointId referencing the dstl file to the 
LocalChangelogRegistry, regardless of whether the checkpoint is completed or 
not. When the checkpoint is complete, call 
LocalChangelogRegistry#discardUpToCheckpoint() method to delete the local file 
whose checkpointId is less than the completed checkpontId. In this way, the 
LocalChangelogRegistry can ensure that the latest completed local checkpoint is 
available, and local files only referenced by non-latest checkpoints can be 
deleted in time. (this PR does exactly that)
 # *delete by not used StateChangeSet*
The "not used StateChangeSet" refers to those StateChangelogSet that are not 
used by any checkpoint, or the checkpoint that uses it has aborted, and the 
StateChangeSet has been materialized. In this case, remote files and local 
files should be handled uniformly, and both are handled by 
TaskChangelogRegistry. We can introduce a "StateChangeUsageTracker" to find out 
those not used StateChangeSets based on checkpoint execution and 
materialization execution, and then notify TaskChangelogRegistry to release the 
reference count of the corresponding file.

In the method described above, the LocalChangelogRegistry is only responsible 
for the deletion of local files when checkpoint subsumed, just as implemented 
in this PR. Since each TaskManager only stores its own checkpoint files 
locally, and these files will be cleaned up when the TaskManager exits, I think 
the problem with local dstl files accumulation is milder than remote dstl 
files. So I suggest do the above modification in two steps :
 # in this ticket, make LocalChangelogRegistry only delete local files when 
checkpoint completed. (just as the PR do)
 # open a new ticket to address deletion of not used both remote and local 
files.

[~Yanfei Lei] , [~yunta] , [~roman] , [~pnowojski], [~yuanmei]  WDYT ?

> Register local recovery files of changelog before notifyCheckpointComplete()
> 
>
> Key: FLINK-30863
> URL: https://issues.apache.org/jira/browse/FLINK-30863
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.17.0
>Reporter: Yanfei Lei
>Assignee: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available
> Attachments: tm-log_fail_cl_local_recovery.txt
>
>
> If TM is materialized before receiving confirm(), the previously uploaded 
> queue in `FsStateChangelogWriter` will be cleared, so the local files of the 
> completed checkpoint will not be registered again, while the JM owned files 
> are registered before confirm(), and do not depend on the uploaded queue, so 
> the local files are deleted, and the DFS files are still there. 
>  
> We have encountered the following situation, the job cannot find the local 
> recovery files, but can restore from the DFS files:
> {code:java}
> 2023-01-18 17:21:13,412 [SlidingProcessingTimeWindows (37/48)#1] INFO  
> org.apache.flink.runtime.taskmanager.Task                    [] - 
> SlidingProcessingTimeWindows (37/48)#1 #1 (fa12cfa3b811a351e031b036b0e85d91) 
> switched from DEPLOYING to INITIALIZING.
> 2023-01-18 17:21:13,440 [SlidingProcessingTimeWindows (37/48)#1] INFO  
> org.apache.flink.runtime.state.TaskLocalStateStoreImpl       [] - Found 
> registered local state for checkpoint 11599 in subtask 
> (2daf1d9bc9ed40ecb191303db813b0de - 0a448493b4782967b150582570326227 - 36) : 
> TaskOperatorSubtaskStates{subtaskStatesByOperatorID={0a448493b4782967b150582570326227=SubtaskState{operatorStateFromBackend=StateObjectCollection{[]},
>  operatorStateFromStream=StateObjectCollection{[]}, 
> keyedStateFromBackend=StateObjectCollection{[org.apache.flink.runtime.state.changelog.ChangelogStateBackendLocalHandle@38aa46db]},
>  keyedStateFromStream=StateObjectC

[jira] [Comment Edited] (FLINK-31139) not upload empty state changelog file

2023-03-21 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703441#comment-17703441
 ] 

Feifan Wang edited comment on FLINK-31139 at 3/22/23 2:08 AM:
--

Hi [~roman] , thanks for your reply, and should we merge this fix to 
release-1.16 & release-1.17 ?


was (Author: feifan wang):
Hi [~roman] , should we merge this fix to release-1.16 & release-1.17 ?

> not upload empty state changelog file
> -
>
> Key: FLINK-31139
> URL: https://issues.apache.org/jira/browse/FLINK-31139
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.15.3, 1.16.1
>Reporter: Feifan Wang
>Assignee: Feifan Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.2, 1.17.1
>
> Attachments: image-2023-02-20-19-51-34-397.png
>
>
> h1. Problem
> *_BatchingStateChangeUploadScheduler_* will upload many empty changelog files 
> (file size == 1  and only contains compressed flag).
> !image-2023-02-20-19-51-34-397.png|width=1062,height=188!
> These files are not referenced by any checkpoints, are not cleaned up, and 
> become more numerous as the job runs. Taking our big job as an example, 2292 
> such files were generated within 7 hours. It only takes about 4 months and 
> the number of files in the changelog directory will exceed a million.
> h1. Problem causes
> This problem is caused by *_BatchingStateChangeUploadScheduler#drainAndSave_* 
> not checking whether the task collection is empty. The data in the scheduled 
> queue may have been uploaded when the 
> _*BatchingStateChangeUploadScheduler#drainAndSave*_ method is executed.
>  
> So we should check whether the task collection is empty in 
> *_BatchingStateChangeUploadScheduler#drainAndSave_* . WDYT [~roman] , 
> [~Yanfei Lei] ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31139) not upload empty state changelog file

2023-03-21 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703441#comment-17703441
 ] 

Feifan Wang commented on FLINK-31139:
-

Hi [~roman] , should we merge this fix to release-1.16 & release-1.17 ?

> not upload empty state changelog file
> -
>
> Key: FLINK-31139
> URL: https://issues.apache.org/jira/browse/FLINK-31139
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.15.3, 1.16.1
>Reporter: Feifan Wang
>Assignee: Feifan Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.2, 1.17.1
>
> Attachments: image-2023-02-20-19-51-34-397.png
>
>
> h1. Problem
> *_BatchingStateChangeUploadScheduler_* will upload many empty changelog files 
> (file size == 1  and only contains compressed flag).
> !image-2023-02-20-19-51-34-397.png|width=1062,height=188!
> These files are not referenced by any checkpoints, are not cleaned up, and 
> become more numerous as the job runs. Taking our big job as an example, 2292 
> such files were generated within 7 hours. It only takes about 4 months and 
> the number of files in the changelog directory will exceed a million.
> h1. Problem causes
> This problem is caused by *_BatchingStateChangeUploadScheduler#drainAndSave_* 
> not checking whether the task collection is empty. The data in the scheduled 
> queue may have been uploaded when the 
> _*BatchingStateChangeUploadScheduler#drainAndSave*_ method is executed.
>  
> So we should check whether the task collection is empty in 
> *_BatchingStateChangeUploadScheduler#drainAndSave_* . WDYT [~roman] , 
> [~Yanfei Lei] ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30863) Register local recovery files of changelog before notifyCheckpointComplete()

2023-03-14 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1770#comment-1770
 ] 

Feifan Wang commented on FLINK-30863:
-

Hi [~Yanfei Lei] , sorry for the late reply, I agree to register before 
confirm() to avoid the both problems mentioned by you and [~assassinj] . 
Further, I think we should do the registration operation in persist(), because 
that's where the reference management is originally generated. To achieve this, 
I suggest pass checkpoint id to persist method. 

WDYT [~roman] ?

> Register local recovery files of changelog before notifyCheckpointComplete()
> 
>
> Key: FLINK-30863
> URL: https://issues.apache.org/jira/browse/FLINK-30863
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.17.0
>Reporter: Yanfei Lei
>Assignee: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available
> Attachments: tm-log_fail_cl_local_recovery.txt
>
>
> If TM is materialized before receiving confirm(), the previously uploaded 
> queue in `FsStateChangelogWriter` will be cleared, so the local files of the 
> completed checkpoint will not be registered again, while the JM owned files 
> are registered before confirm(), and do not depend on the uploaded queue, so 
> the local files are deleted, and the DFS files are still there. 
>  
> We have encountered the following situation, the job cannot find the local 
> recovery files, but can restore from the DFS files:
> {code:java}
> 2023-01-18 17:21:13,412 [SlidingProcessingTimeWindows (37/48)#1] INFO  
> org.apache.flink.runtime.taskmanager.Task                    [] - 
> SlidingProcessingTimeWindows (37/48)#1 #1 (fa12cfa3b811a351e031b036b0e85d91) 
> switched from DEPLOYING to INITIALIZING.
> 2023-01-18 17:21:13,440 [SlidingProcessingTimeWindows (37/48)#1] INFO  
> org.apache.flink.runtime.state.TaskLocalStateStoreImpl       [] - Found 
> registered local state for checkpoint 11599 in subtask 
> (2daf1d9bc9ed40ecb191303db813b0de - 0a448493b4782967b150582570326227 - 36) : 
> TaskOperatorSubtaskStates{subtaskStatesByOperatorID={0a448493b4782967b150582570326227=SubtaskState{operatorStateFromBackend=StateObjectCollection{[]},
>  operatorStateFromStream=StateObjectCollection{[]}, 
> keyedStateFromBackend=StateObjectCollection{[org.apache.flink.runtime.state.changelog.ChangelogStateBackendLocalHandle@38aa46db]},
>  keyedStateFromStream=StateObjectCollection{[]}, 
> inputChannelState=StateObjectCollection{[]}, 
> resultSubpartitionState=StateObjectCollection{[]}, stateSize=1764644202, 
> checkpointedSize=1997682}}, isTaskDeployedAsFinished=false, 
> isTaskFinished=false}
> 2023-01-18 17:21:13,442 [SlidingProcessingTimeWindows (37/48)#1] INFO  
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - 
> Getting managed memory shared cache for RocksDB.
> 2023-01-18 17:21:13,446 [SlidingProcessingTimeWindows (37/48)#1] INFO  
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - 
> Obtained shared RocksDB cache of size 1438814063 bytes
> 2023-01-18 17:21:13,447 [SlidingProcessingTimeWindows (37/48)#1] INFO  
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation
>  [] - Starting to restore from state handle: 
> IncrementalLocalKeyedStateHandle{metaDataState=File State: 
> file:/opt/flink/flink-tmp-dir/tm_job-2daf1d9b-c9ed-40ec-b191-303db813b0de-taskmanager-1-31/localState/aid_45af7e6b612dad10b60554d81323d5f3/jid_2daf1d9bc9ed40ecb191303db813b0de/vtx_0a448493b4782967b150582570326227_sti_36/chk_125/0d082666-bd31-4ebe-9977-545c0d9b18a5
>  [1187 bytes]} 
> DirectoryKeyedStateHandle{directoryStateHandle=DirectoryStateHandle{directory=/opt/flink/flink-tmp-dir/tm_job-2daf1d9b-c9ed-40ec-b191-303db813b0de-taskmanager-1-31/localState/aid_45af7e6b612dad10b60554d81323d5f3/jid_2daf1d9bc9ed40ecb191303db813b0de/vtx_0a448493b4782967b150582570326227_sti_36/chk_125/b3e1d20f164d4c5baed291f5d1224183},
>  keyGroupRange=KeyGroupRange{startKeyGroup=96, endKeyGroup=98}} without 
> rescaling.
> 2023-01-18 17:21:13,495 [SlidingProcessingTimeWindows (37/48)#1] INFO  
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation
>  [] - Finished restoring from state handle: 
> IncrementalLocalKeyedStateHandle{metaDataState=File State: 
> file:/opt/flink/flink-tmp-dir/tm_job-2daf1d9b-c9ed-40ec-b191-303db813b0de-taskmanager-1-31/localState/aid_45af7e6b612dad10b60554d81323d5f3/jid_2daf1d9bc9ed40ecb191303db813b0de/vtx_0a448493b4782967b150582570326227_sti_36/chk_125/0d082666-bd31-4ebe-9977-545c0d9b18a5
>  [1187 bytes]} 
> DirectoryKeyedStateHandle{directoryStateHandle=DirectoryStateHandle{directory=/opt/flink/flink-tmp-dir

[jira] [Commented] (FLINK-31414) exceptions in the alignment timer are ignored

2023-03-13 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17699928#comment-17699928
 ] 

Feifan Wang commented on FLINK-31414:
-

Thanks for reply [~pnowojski] , sorry for the lack of clarity in the previous 
description, let me answer your question first :
{quote}the stack trace doesn't match to the master code, so I'm not sure what 
Flink version you are using?
{quote}
based on *release-1.16.1* , cherry-picked some bug fix.
{quote}doesn't the error message "switched from RUNNING to FAILED" refer to 
actually subtask/task switching to FAILED state, contradicting your statement 
that the exception is being ignored?
{quote}
Yes, it is a subtask switching to FAILED state. I mean the exception thrown in 
the alignment timer task is being ignored, causing the subtask thread to 
continue executing to trigger the exception I posted above.

Here is the more complete log ( I change some log level from debug to info ) :

 
{code:java}
2023-03-10 12:09:42,416 INFO  [MV_J_PV -> mv-join-after-operator -> 
extract-event-identifier (4517/4800)#1] 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler
  - MV_J_PV -> mv-join-after-operator -> extract-event-identifier 
(4517/4800)#1 
(cb2e56879557c676c9897cda44fe3c9e_4f7e0f4c19a43f929bda6907ee1f3150_4516_1): 
Received barrier from channel InputChannelInfo{gateIdx=1, inputChannelIdx=586} 
@ 17.
2023-03-10 12:09:42,673 INFO  [MV_J_PV -> mv-join-after-operator -> 
extract-event-identifier (4517/4800)#1] 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl  - MV_J_PV 
-> mv-join-after-operator -> extract-event-identifier (4517/4800)#1 
starting checkpoint 17 
(CheckpointOptions{checkpointType=CheckpointType{name='Checkpoint', 
sharingFilesStrategy=FORWARD_BACKWARD}, targetLocation=(default), 
alignmentType=UNALIGNED, alignedCheckpointTimeout=9223372036854775807})
2023-03-10 12:09:42,673 INFO  [MV_J_PV -> mv-join-after-operator -> 
extract-event-identifier (4517/4800)#1] 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl  - MV_J_PV 
-> mv-join-after-operator -> extract-event-identifier (4517/4800)#1 put 
ChannelStateWriteResult : 17
2023-03-10 12:09:42,675 INFO  [MV_J_PV -> mv-join-after-operator -> 
extract-event-identifier (4517/4800)#1] 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler
  - MV_J_PV -> mv-join-after-operator -> extract-event-identifier 
(4517/4800)#1 
(cb2e56879557c676c9897cda44fe3c9e_4f7e0f4c19a43f929bda6907ee1f3150_4516_1): 
Triggering checkpoint 17 on the barrier announcement at 1678421367671.
2023-03-10 12:09:42,675 INFO  [MV_J_PV -> mv-join-after-operator -> 
extract-event-identifier (4517/4800)#1] 
org.apache.flink.streaming.runtime.tasks.StreamTask           - 
triggerCheckpointOnBarrier Starting checkpoint 17 
CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD} on 
task MV_J_PV -> mv-join-after-operator -> extract-event-identifier 
(4517/4800)#1
2023-03-10 12:09:42,675 INFO  [MV_J_PV -> mv-join-after-operator -> 
extract-event-identifier (4517/4800)#1] 
org.apache.flink.streaming.runtime.tasks.StreamTask           - Starting 
checkpoint 17 CheckpointType{name='Checkpoint', 
sharingFilesStrategy=FORWARD_BACKWARD} on task MV_J_PV -> 
mv-join-after-operator -> extract-event-identifier (4517/4800)#1
2023-03-10 12:09:42,675 INFO  [MV_J_PV -> mv-join-after-operator -> 
extract-event-identifier (4517/4800)#1] 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl  - MV_J_PV 
-> mv-join-after-operator -> extract-event-identifier (4517/4800)#1 
requested write result, checkpoint 17
2023-03-10 12:09:42,676 INFO  [MV_J_PV -> mv-join-after-operator -> 
extract-event-identifier (4517/4800)#1] 
org.apache.flink.state.changelog.ChangelogKeyedStateBackend   - snapshot of 
MV_J_PV -> mv-join-after-operator -> extract-event-identifier 
(4517/4800)#1 for checkpoint 17, change range: 39..46, materialization ID 4
2023-03-10 12:09:42,677 INFO  [MV_J_PV -> mv-join-after-operator -> 
extract-event-identifier (4517/4800)#1] 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain  - Could not 
complete snapshot 17 for operator MV_J_PV -> mv-join-after-operator -> 
extract-event-identifier (4517/4800)#1. Failure reason: Checkpoint was declined.
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete 
snapshot 17 for operator MV_J_PV -> mv-join-after-operator -> 
extract-event-identifier (4517/4800)#1. Failure reason: Checkpoint was declined.
    at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:269)
    at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173)
    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:3

[jira] [Updated] (FLINK-31414) exceptions in the alignment timer are ignored

2023-03-13 Thread Feifan Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feifan Wang updated FLINK-31414:

Description: 
Alignment timer task in alternating aligned checkpoint run as a future task in 
mailbox thread, causing the exceptions 
([SingleCheckpointBarrierHandler#registerAlignmentTimer()|https://github.com/apache/flink/blob/65ab8e820a3714d2134dfb4c9772a10c998bd45a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java#L327])
 to be ignored. These exceptions should have failed the task, but now this will 
cause the same checkpoint to fire twice initInputsCheckpoints in my test.

 
{code:java}
 switched from RUNNING to FAILED with failure cause: 
java.lang.RuntimeException: unable to send request to worker
        at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.enqueue(ChannelStateWriterImpl.java:247)
        at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.addInputData(ChannelStateWriterImpl.java:161)
        at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.prepareSnapshot(StreamTaskNetworkInput.java:103)
        at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.prepareSnapshot(StreamOneInputProcessor.java:83)
        at 
org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.prepareSnapshot(StreamMultipleInputProcessor.java:122)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.prepareInputSnapshot(StreamTask.java:518)
        at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.prepareInflightDataSnapshot(SubtaskCheckpointCoordinatorImpl.java:655)
        at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.initInputsCheckpoint(SubtaskCheckpointCoordinatorImpl.java:515)
        at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.initInputsCheckpoint(SingleCheckpointBarrierHandler.java:516)
        at 
org.apache.flink.streaming.runtime.io.checkpointing.AlternatingCollectingBarriers.alignmentTimeout(AlternatingCollectingBarriers.java:46)
        at 
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlternatingAlignedBarrierHandlerState.barrierReceived(AbstractAlternatingAlignedBarrierHandlerState.java:54)
        at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
        at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
        at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
        at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
        at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
        at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
        at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
        at 
org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
        at java.lang.Thread.run(Thread.java:748)
        Suppressed: java.io.IOException: java.lang.IllegalStateException: 
writer not found for request start 17
                at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.close(ChannelStateWriteRequestExecutorImpl.java:175)
                at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.close(ChannelStateWriterImpl.java:235)
                at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.cancel(SubtaskCheckpointCoordinatorImpl.java:564)
                at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.close(SubtaskCheckpointCoordina

[jira] [Updated] (FLINK-31414) exceptions in the alignment timer are ignored

2023-03-13 Thread Feifan Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feifan Wang updated FLINK-31414:

Description: 
Alignment timer task in alternating aligned checkpoint run as a future task in 
mailbox thread, causing the exceptions 
([SingleCheckpointBarrierHandler#registerAlignmentTimer()|https://github.com/apache/flink/blob/65ab8e820a3714d2134dfb4c9772a10c998bd45a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java#L327])
 to be ignored. These exceptions should have failed the task, but now this will 
cause the same checkpoint to fire twice initInputsCheckpoints.

 

see : 
[BarrierAlignmentUtil#createRegisterTimerCallback()|https://github.com/apache/flink/blob/65ab8e820a3714d2134dfb4c9772a10c998bd45a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/BarrierAlignmentUtil.java#L50]

 

  was:
Alignment timer task in alternating aligned checkpoint run as a future task in 
mailbox thread, causing the exceptions 
([SingleCheckpointBarrierHandler#registerAlignmentTimer()|https://github.com/apache/flink/blob/65ab8e820a3714d2134dfb4c9772a10c998bd45a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java#L327])
 to be ignored. These exceptions should have failed the task, but now this will 
cause the same checkpoint to fire twice initInputsCheckpoints.

see : 
[BarrierAlignmentUtil#createRegisterTimerCallback()|https://github.com/apache/flink/blob/65ab8e820a3714d2134dfb4c9772a10c998bd45a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/BarrierAlignmentUtil.java#L50]


> exceptions in the alignment timer are ignored
> -
>
> Key: FLINK-31414
> URL: https://issues.apache.org/jira/browse/FLINK-31414
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Reporter: Feifan Wang
>Priority: Major
>
> Alignment timer task in alternating aligned checkpoint run as a future task 
> in mailbox thread, causing the exceptions 
> ([SingleCheckpointBarrierHandler#registerAlignmentTimer()|https://github.com/apache/flink/blob/65ab8e820a3714d2134dfb4c9772a10c998bd45a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java#L327])
>  to be ignored. These exceptions should have failed the task, but now this 
> will cause the same checkpoint to fire twice initInputsCheckpoints.
>  
> see : 
> [BarrierAlignmentUtil#createRegisterTimerCallback()|https://github.com/apache/flink/blob/65ab8e820a3714d2134dfb4c9772a10c998bd45a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/BarrierAlignmentUtil.java#L50]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31414) exceptions in the alignment timer are ignored

2023-03-13 Thread Feifan Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feifan Wang updated FLINK-31414:

Description: 
Alignment timer task in alternating aligned checkpoint run as a future task in 
mailbox thread, causing the exceptions 
([SingleCheckpointBarrierHandler#registerAlignmentTimer()|https://github.com/apache/flink/blob/65ab8e820a3714d2134dfb4c9772a10c998bd45a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java#L327])
 to be ignored. These exceptions should have failed the task, but now this will 
cause the same checkpoint to fire twice initInputsCheckpoints.

see : 
[BarrierAlignmentUtil#createRegisterTimerCallback()|https://github.com/apache/flink/blob/65ab8e820a3714d2134dfb4c9772a10c998bd45a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/BarrierAlignmentUtil.java#L50]

  was:
Alignment timer task in alternating aligned checkpoint run as a future task in 
mailbox thread, leads to exceptions 
([SingleCheckpointBarrierHandler#registerAlignmentTimer()|https://github.com/apache/flink/blob/65ab8e820a3714d2134dfb4c9772a10c998bd45a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java#L327])
 are ignored. These exceptions should have failed task.

see : 
[BarrierAlignmentUtil#createRegisterTimerCallback()|https://github.com/apache/flink/blob/65ab8e820a3714d2134dfb4c9772a10c998bd45a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/BarrierAlignmentUtil.java#L50]


> exceptions in the alignment timer are ignored
> -
>
> Key: FLINK-31414
> URL: https://issues.apache.org/jira/browse/FLINK-31414
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Reporter: Feifan Wang
>Priority: Major
>
> Alignment timer task in alternating aligned checkpoint run as a future task 
> in mailbox thread, causing the exceptions 
> ([SingleCheckpointBarrierHandler#registerAlignmentTimer()|https://github.com/apache/flink/blob/65ab8e820a3714d2134dfb4c9772a10c998bd45a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java#L327])
>  to be ignored. These exceptions should have failed the task, but now this 
> will cause the same checkpoint to fire twice initInputsCheckpoints.
> see : 
> [BarrierAlignmentUtil#createRegisterTimerCallback()|https://github.com/apache/flink/blob/65ab8e820a3714d2134dfb4c9772a10c998bd45a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/BarrierAlignmentUtil.java#L50]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31414) exceptions in the alignment timer are ignored

2023-03-13 Thread Feifan Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feifan Wang updated FLINK-31414:

Description: 
Alignment timer task in alternating aligned checkpoint run as a future task in 
mailbox thread, leads to exceptions 
([SingleCheckpointBarrierHandler#registerAlignmentTimer()|https://github.com/apache/flink/blob/65ab8e820a3714d2134dfb4c9772a10c998bd45a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java#L327])
 are ignored. These exceptions should have failed task.

see : 
[BarrierAlignmentUtil#createRegisterTimerCallback()|https://github.com/apache/flink/blob/65ab8e820a3714d2134dfb4c9772a10c998bd45a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/BarrierAlignmentUtil.java#L50]

  was:
Alignment timer task in alternating aligned checkpoint run as a future task in 
mailbox thread, leads to exceptions 
([SingleCheckpointBarrierHandler#registerAlignmentTimer()|https://github.com/apache/flink/blob/65ab8e820a3714d2134dfb4c9772a10c998bd45a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java#L327])
 are ignored. 

see : 
[BarrierAlignmentUtil#createRegisterTimerCallback()|https://github.com/apache/flink/blob/65ab8e820a3714d2134dfb4c9772a10c998bd45a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/BarrierAlignmentUtil.java#L50]


> exceptions in the alignment timer are ignored
> -
>
> Key: FLINK-31414
> URL: https://issues.apache.org/jira/browse/FLINK-31414
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Reporter: Feifan Wang
>Priority: Major
>
> Alignment timer task in alternating aligned checkpoint run as a future task 
> in mailbox thread, leads to exceptions 
> ([SingleCheckpointBarrierHandler#registerAlignmentTimer()|https://github.com/apache/flink/blob/65ab8e820a3714d2134dfb4c9772a10c998bd45a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java#L327])
>  are ignored. These exceptions should have failed task.
> see : 
> [BarrierAlignmentUtil#createRegisterTimerCallback()|https://github.com/apache/flink/blob/65ab8e820a3714d2134dfb4c9772a10c998bd45a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/BarrierAlignmentUtil.java#L50]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31414) exceptions in the alignment timer are ignored

2023-03-13 Thread Feifan Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feifan Wang updated FLINK-31414:

Description: 
Alignment timer task in alternating aligned checkpoint run as a future task in 
mailbox thread, leads to exceptions 
([SingleCheckpointBarrierHandler#registerAlignmentTimer()|https://github.com/apache/flink/blob/65ab8e820a3714d2134dfb4c9772a10c998bd45a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java#L327])
 are ignored. 

see : 
[BarrierAlignmentUtil#createRegisterTimerCallback()|https://github.com/apache/flink/blob/65ab8e820a3714d2134dfb4c9772a10c998bd45a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/BarrierAlignmentUtil.java#L50]

  was:
Alignment timer task in alternating aligned checkpoint run as a future task in 
mailbox thread, leads to exceptions 
([SingleCheckpointBarrierHandler#registerAlignmentTimer()|https://github.com/apache/flink/blob/65ab8e820a3714d2134dfb4c9772a10c998bd45a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java#L327])
 are ignored.

see : 
[BarrierAlignmentUtil#createRegisterTimerCallback()|https://github.com/apache/flink/blob/65ab8e820a3714d2134dfb4c9772a10c998bd45a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/BarrierAlignmentUtil.java#L50]


> exceptions in the alignment timer are ignored
> -
>
> Key: FLINK-31414
> URL: https://issues.apache.org/jira/browse/FLINK-31414
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Reporter: Feifan Wang
>Priority: Major
>
> Alignment timer task in alternating aligned checkpoint run as a future task 
> in mailbox thread, leads to exceptions 
> ([SingleCheckpointBarrierHandler#registerAlignmentTimer()|https://github.com/apache/flink/blob/65ab8e820a3714d2134dfb4c9772a10c998bd45a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java#L327])
>  are ignored. 
> see : 
> [BarrierAlignmentUtil#createRegisterTimerCallback()|https://github.com/apache/flink/blob/65ab8e820a3714d2134dfb4c9772a10c998bd45a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/BarrierAlignmentUtil.java#L50]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31414) exceptions in the alignment timer are ignored

2023-03-13 Thread Feifan Wang (Jira)
Feifan Wang created FLINK-31414:
---

 Summary: exceptions in the alignment timer are ignored
 Key: FLINK-31414
 URL: https://issues.apache.org/jira/browse/FLINK-31414
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Reporter: Feifan Wang


Alignment timer task in alternating aligned checkpoint run as a future task in 
mailbox thread, leads to exceptions 
([SingleCheckpointBarrierHandler#registerAlignmentTimer()|https://github.com/apache/flink/blob/65ab8e820a3714d2134dfb4c9772a10c998bd45a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java#L327])
 are ignored.

see : 
[BarrierAlignmentUtil#createRegisterTimerCallback()|https://github.com/apache/flink/blob/65ab8e820a3714d2134dfb4c9772a10c998bd45a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/BarrierAlignmentUtil.java#L50]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31139) not upload empty state changelog file

2023-02-20 Thread Feifan Wang (Jira)
Feifan Wang created FLINK-31139:
---

 Summary: not upload empty state changelog file
 Key: FLINK-31139
 URL: https://issues.apache.org/jira/browse/FLINK-31139
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Reporter: Feifan Wang
 Fix For: 1.16.2
 Attachments: image-2023-02-20-19-51-34-397.png

h1. Problem

*_BatchingStateChangeUploadScheduler_* will upload many empty changelog files 
(file size == 1  and only contains compressed flag).

!image-2023-02-20-19-51-34-397.png|width=1062,height=188!

These files are not referenced by any checkpoints, are not cleaned up, and 
become more numerous as the job runs. Taking our big job as an example, 2292 
such files were generated within 7 hours. It only takes about 4 months and the 
number of files in the changelog directory will exceed a million.
h1. Problem causes

This problem is caused by *_BatchingStateChangeUploadScheduler#drainAndSave_* 
not checking whether the task collection is empty. The data in the scheduled 
queue may have been uploaded when the 
_*BatchingStateChangeUploadScheduler#drainAndSave*_ method is executed.

 

So we should check whether the task collection is empty in 
*_BatchingStateChangeUploadScheduler#drainAndSave_* . WDYT [~roman] , [~Yanfei 
Lei] ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-30792) clean up not uploaded state changes after materialization complete

2023-02-16 Thread Feifan Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feifan Wang closed FLINK-30792.
---
Fix Version/s: 1.17.0
   1.16.2
   Resolution: Duplicate

> clean up not uploaded state changes after materialization complete
> --
>
> Key: FLINK-30792
> URL: https://issues.apache.org/jira/browse/FLINK-30792
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.16.0
>Reporter: Feifan Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.16.2
>
> Attachments: image-2023-02-03-11-30-40-198.png
>
>
> We should clean up not uploaded state changes after materialization 
> completed, otherwise it may cause FileNotFoundException.
> Since state changes before completed materialization in 
> FsStateChangelogWriter#notUploaded will not be used in any subsequent 
> checkpoint, I suggest clean up it while handle materialization result. 
> How do you think about this ? [~ym] , [~roman] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30792) clean up not uploaded state changes after materialization complete

2023-02-09 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17686891#comment-17686891
 ] 

Feifan Wang commented on FLINK-30792:
-

Thanks for your reply [~roman] , you are right, ref counting state changes per 
state handle can indeed solve the problem of changelog file not found mentioned 
above. The changes in this PR are only intended to reduce useless data uploads.

As for the performance regression problem you mentioned, I really didn't think 
about it carefully before. Now I'm also not sure if this is causing a 
performance regression. On the one hand, this will indeed reduce the amount of 
data uploaded; on the other hand, it will indeed require more data to be 
uploaded when the checkpoint is triggered.

> clean up not uploaded state changes after materialization complete
> --
>
> Key: FLINK-30792
> URL: https://issues.apache.org/jira/browse/FLINK-30792
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.16.0
>Reporter: Feifan Wang
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2023-02-03-11-30-40-198.png
>
>
> We should clean up not uploaded state changes after materialization 
> completed, otherwise it may cause FileNotFoundException.
> Since state changes before completed materialization in 
> FsStateChangelogWriter#notUploaded will not be used in any subsequent 
> checkpoint, I suggest clean up it while handle materialization result. 
> How do you think about this ? [~ym] , [~roman] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28440) EventTimeWindowCheckpointingITCase failed with restore

2023-02-02 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17683655#comment-17683655
 ] 

Feifan Wang commented on FLINK-28440:
-

Hi [~ym] , [~roman] , I communicated with [~Yanfei Lei]  offline, we think 
there are two situations that will cause the changelog file not found problem。
h2. Case-1 :

first is the “First case” I mentioned above :

!image-2023-02-03-12-03-16-155.png|width=708,height=315!

 
  # checkpoint-1 trigger and completed, upTo sqn=1
 # materialization-1 triggered, upTo sqn=2. ChangeSet[sqn=1] will be added to 
{*}_notUploaded_{*}.
 # materialization-1 completed, upTo sqn=3
 # checkpoint-2 trigger and completed, upTo sqn=3. Since materialization-1 has 
been completed when checkpoint-2 is triggered, checkpoint-2 is from sqn=2. 
ChangeSet[sqn=1] stays in notUploaded.
 # checkpoint-3 trigger and complated, upTo sqn=4.
 # before chk-2 subsumed, ChangeSet[sqn=4] trigger a pre-emptively upload, 
ChangeSet[sqn=1] and ChangeSet[sqn=4] are saved in the same file.
 # checkpoint-2 subsumed, FsStateChangelogWriter try to truncate to sqn=2. 
ChangeSet[sqn=1] will be delete, but also delete ChangeSet[sqn=4] which in the 
same file.
 # checkpoint-4 trigger and completed, it contains changelog [sqn=2 ~ sqn=4], 
but ChangeSet[sqn=4] already be deleted.

h2. Case-2 :

case-2 is provided by yanfei :

!image-2023-02-03-12-03-56-614.png|width=632,height=308!
 # checkpoint-1 trigger , upTo sqn=1. But BatchingStateChangeUploadScheduler is 
waiting for persist-delay
 # ChangeSet[sqn=1] trigger a pre-emptively upload, ChangeSet[sqn=0] and 
ChangeSet[sqn=1] are saved in the same file, and checkpoint-1 completed.
 # the task was canceled for other reasons before checkpoint-1 was confirmed 
(JobManager completed checkpoint-1 , but the confirm message has not yet 
reached the task). FsStateChangelogWriter try to truncateAndClose from sqn=1, 
ChangeSet[sqn=1] will be delete, but also delete ChangeSet[sqn=0] which in the 
same file.

h2. How to reproduce :

FsStateChangelogWriterTest#testChangelogFileAvailable and 
FsStateChangelogWriterTest#testChangelogFileAvailableAgain [in this 
PR|https://github.com/apache/flink/pull/21812] correspond to case-1 and case-2 
above respectively.
h2. Problem analysis :

Like I mentioned above, I think the source of the problem is 
FsStateChangelogWriter#notifyStateNotUsed. A SteamStateHandle may correspond to 
multiple UploadResults. We cannot assume that the entire StreamStateHandle is 
no longer needed by this backend when a certain UploadResult is not used.

!image-2023-02-01-01-19-12-182.png|width=652,height=268!
h2. Proposal :

I propose to record the reference count of the StreamStateHandle in the 
TaskChangelogRegistry instead of the backend collection. 
TaskChangelogRegistry#notUsed(streamStateHandle, uploadResult) only decrements 
the reference count by one, and deletes the steamStateHandle when the reference 
count reaches zero.

What do you think  [~ym] , [~roman]  ?

> EventTimeWindowCheckpointingITCase failed with restore
> --
>
> Key: FLINK-28440
> URL: https://issues.apache.org/jira/browse/FLINK-28440
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Yanfei Lei
>Priority: Critical
>  Labels: auto-deprioritized-critical, pull-request-available, 
> test-stability
> Attachments: image-2023-02-01-00-51-54-506.png, 
> image-2023-02-01-01-10-01-521.png, image-2023-02-01-01-19-12-182.png, 
> image-2023-02-01-16-47-23-756.png, image-2023-02-01-16-57-43-889.png, 
> image-2023-02-02-10-52-56-599.png, image-2023-02-03-10-09-07-586.png, 
> image-2023-02-03-12-03-16-155.png, image-2023-02-03-12-03-56-614.png
>
>
> {code:java}
> Caused by: java.lang.Exception: Exception while creating 
> StreamOperatorStateContext.
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:256)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:722)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:698)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:665)
>   at 
> org.apache.flink.runtim

[jira] [Updated] (FLINK-28440) EventTimeWindowCheckpointingITCase failed with restore

2023-02-02 Thread Feifan Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feifan Wang updated FLINK-28440:

Attachment: image-2023-02-03-12-03-56-614.png

> EventTimeWindowCheckpointingITCase failed with restore
> --
>
> Key: FLINK-28440
> URL: https://issues.apache.org/jira/browse/FLINK-28440
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Yanfei Lei
>Priority: Critical
>  Labels: auto-deprioritized-critical, pull-request-available, 
> test-stability
> Attachments: image-2023-02-01-00-51-54-506.png, 
> image-2023-02-01-01-10-01-521.png, image-2023-02-01-01-19-12-182.png, 
> image-2023-02-01-16-47-23-756.png, image-2023-02-01-16-57-43-889.png, 
> image-2023-02-02-10-52-56-599.png, image-2023-02-03-10-09-07-586.png, 
> image-2023-02-03-12-03-16-155.png, image-2023-02-03-12-03-56-614.png
>
>
> {code:java}
> Caused by: java.lang.Exception: Exception while creating 
> StreamOperatorStateContext.
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:256)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:722)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:698)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:665)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for WindowOperator_0a448493b4782967b150582570326227_(2/4) from 
> any of the 1 provided restore options.
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:353)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:165)
>   ... 11 more
> Caused by: java.lang.RuntimeException: java.io.FileNotFoundException: 
> /tmp/junit1835099326935900400/junit1113650082510421526/52ee65b7-033f-4429-8ddd-adbe85e27ced
>  (No such file or directory)
>   at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
>   at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:87)
>   at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.hasNext(StateChangelogHandleStreamHandleReader.java:69)
>   at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.readBackendHandle(ChangelogBackendRestoreOperation.java:96)
>   at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:75)
>   at 
> org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:92)
>   at 
> org.apache.flink.state.changelog.AbstractChangelogStateBackend.createKeyedStateBackend(AbstractChangelogStateBackend.java:136)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:336)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>   ... 13 more
> Caused by: java.io.FileNotFoundException: 
> /tmp/junit1835099326935900400/junit1113650082510421526/52ee65b7-033f-4429-8ddd-adbe85e27ced
>  (No such file or directory)
>   at java.io.FileInputStream.open0(Native Method)
>   at java.io.FileInputStream.open(FileInpu

[jira] [Updated] (FLINK-28440) EventTimeWindowCheckpointingITCase failed with restore

2023-02-02 Thread Feifan Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feifan Wang updated FLINK-28440:

Attachment: image-2023-02-03-12-03-16-155.png

> EventTimeWindowCheckpointingITCase failed with restore
> --
>
> Key: FLINK-28440
> URL: https://issues.apache.org/jira/browse/FLINK-28440
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Yanfei Lei
>Priority: Critical
>  Labels: auto-deprioritized-critical, pull-request-available, 
> test-stability
> Attachments: image-2023-02-01-00-51-54-506.png, 
> image-2023-02-01-01-10-01-521.png, image-2023-02-01-01-19-12-182.png, 
> image-2023-02-01-16-47-23-756.png, image-2023-02-01-16-57-43-889.png, 
> image-2023-02-02-10-52-56-599.png, image-2023-02-03-10-09-07-586.png, 
> image-2023-02-03-12-03-16-155.png, image-2023-02-03-12-03-56-614.png
>
>
> {code:java}
> Caused by: java.lang.Exception: Exception while creating 
> StreamOperatorStateContext.
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:256)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:722)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:698)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:665)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for WindowOperator_0a448493b4782967b150582570326227_(2/4) from 
> any of the 1 provided restore options.
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:353)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:165)
>   ... 11 more
> Caused by: java.lang.RuntimeException: java.io.FileNotFoundException: 
> /tmp/junit1835099326935900400/junit1113650082510421526/52ee65b7-033f-4429-8ddd-adbe85e27ced
>  (No such file or directory)
>   at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
>   at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:87)
>   at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.hasNext(StateChangelogHandleStreamHandleReader.java:69)
>   at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.readBackendHandle(ChangelogBackendRestoreOperation.java:96)
>   at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:75)
>   at 
> org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:92)
>   at 
> org.apache.flink.state.changelog.AbstractChangelogStateBackend.createKeyedStateBackend(AbstractChangelogStateBackend.java:136)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:336)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>   ... 13 more
> Caused by: java.io.FileNotFoundException: 
> /tmp/junit1835099326935900400/junit1113650082510421526/52ee65b7-033f-4429-8ddd-adbe85e27ced
>  (No such file or directory)
>   at java.io.FileInputStream.open0(Native Method)
>   at java.io.FileInputStream.open(FileInpu

[jira] [Comment Edited] (FLINK-28440) EventTimeWindowCheckpointingITCase failed with restore

2023-02-01 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682677#comment-17682677
 ] 

Feifan Wang edited comment on FLINK-28440 at 2/2/23 2:54 AM:
-

Hi [~Yanfei Lei] ,[~ym] ,[~roman] , We also encountered the problem of 
changelog FileNotFoundException during restore. After a period of 
investigation, I found two reasons for this problem. They are all caused by 
pre-emptive upload and BatchingStateChangeUploadScheduler :
h3. *First case :*

Pre-emptive upload causes the stale StateChangeSet to be uploaded into the same 
file along with the latest StateChangeSet.

!image-2023-02-01-00-51-54-506.png|width=617,height=342!

The small square with a number in the middle represents a StateChange, and the 
number in it indicates the SequenceNumber to which the StateChange belongs.

StateChange-3 triggers a pre-emptive upload, and then StateChange-1 and 
StateChange-3 are uploaded to the same file.
{code:java}
if (activeChangeSetSize >= preEmptivePersistThresholdInBytes) {
LOG.info(
"pre-emptively flush {}MB of appended changes to the common store",
activeChangeSetSize / 1024 / 1024);
persistInternal(notUploaded.isEmpty() ? activeSequenceNumber : 
notUploaded.firstKey());
} {code}
When checkpoint-3 is completed, checkpoint-2 will be subsumed. When 
checkpoint-3 is completed, checkpoint-2 will be subsumed. Then StateChange-1 
will be truncate, further causing file-3 to be deleted. If restoring from 
checkpoint-3, a FileNotFoundException will occur.
h3. *Second case :*

!image-2023-02-01-01-10-01-521.png|width=484,height=391!

StateChange-0,1,2 are uploaded in the same file due to 
dstl.dfs.batch.persist-delay and pre-emptive. Checkpoint-1 will be subsumed 
after checkpoint-2 complete, then file-1 will be delete. But actually 
checkpoint-2 needs file-1 ( checkpoint-2 needs StateChange-2 ).
h3. *Summarize :*

The root of the problem is FsStateChangelogWriter#notifyStateNotUsed.

!image-2023-02-01-01-19-12-182.png|width=759,height=312!

 

I wrote two test cases to reproduce the above problem, namely 
_FsStateChangelogWriterTest#testChangelogFileNotFound1()_ and 
{_}FsStateChangelogWriterTest#testChangelogFileNotFound2(){_}, [you can find it 
here|https://github.com/apache/flink/pull/21812].

 

I suggest to record the largest SequenceNumber in each StreamStateHandle in the 
FsStateChangelogWriter and checking it before calling 
changelogRegistry.notUsed(). 

WDYT [~ym] ,[~roman] ,[~Yanfei Lei]  ?


was (Author: feifan wang):
Hi [~Yanfei Lei] ,[~ym] ,[~roman] , We also encountered the problem of 
changelog FileNotFoundException during restore. After a period of 
investigation, I found two reasons for this problem. They are all caused by 
pre-emptive upload and BatchingStateChangeUploadScheduler :
h3. *First case :*

Pre-emptive upload causes the stale StateChangeSet to be uploaded into the same 
file along with the latest StateChangeSet.

!image-2023-02-01-00-51-54-506.png|width=617,height=342!

The small square with a number in the middle represents a StateChange, and the 
number in it indicates the SequenceNumber to which the StateChange belongs.

StateChange-3 triggers a pre-emptive upload, and then StateChange-1 and 
StateChange-3 are uploaded to the same file.
{code:java}
if (activeChangeSetSize >= preEmptivePersistThresholdInBytes) {
LOG.info(
"pre-emptively flush {}MB of appended changes to the common store",
activeChangeSetSize / 1024 / 1024);
persistInternal(notUploaded.isEmpty() ? activeSequenceNumber : 
notUploaded.firstKey());
} {code}
When checkpoint-3 is completed, checkpoint-2 will be resumed. When checkpoint-3 
is completed, checkpoint-2 will be resumed. Then StateChange-1 will be 
truncate, further causing file-3 to be deleted. If restoring from checkpoint-3, 
a FileNotFoundException will occur.
h3. *Second case :*

!image-2023-02-01-01-10-01-521.png|width=484,height=391!

StateChange-0,1,2 are uploaded in the same file due to 
dstl.dfs.batch.persist-delay and pre-emptive. Checkpoint-1 will be resume after 
checkpoint-2 complete, then file-1 will be delete. But actually checkpoint-2 
needs file-1 ( checkpoint-2 needs StateChange-2 ).
h3. *Summarize :*

The root of the problem is FsStateChangelogWriter#notifyStateNotUsed.

!image-2023-02-01-01-19-12-182.png|width=759,height=312!

 

I wrote two test cases to reproduce the above problem, namely 
_FsStateChangelogWriterTest#testChangelogFileNotFound1()_ and 
{_}FsStateChangelogWriterTest#testChangelogFileNotFound2(){_}, [you can find it 
here|https://github.com/apache/flink/pull/21812].

 

I suggest to record the largest SequenceNumber in each StreamStateHandle in the 
FsStateChangelogWriter and checking it before calling 
changelogRegistry.notUsed(). 

WDYT [~ym] ,[~roman] ,[~Yanfei Lei]  ?

> EventTimeWindowCheckpointingITCase failed with r

[jira] [Comment Edited] (FLINK-28440) EventTimeWindowCheckpointingITCase failed with restore

2023-02-01 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682912#comment-17682912
 ] 

Feifan Wang edited comment on FLINK-28440 at 2/2/23 2:53 AM:
-

The updated _FsStateChangelogWriterTest#testChangelogFileNotFound1_ can be 
described by the following figure :

!image-2023-02-02-10-52-56-599.png|width=600,height=273!

When materialization-1 is triggered, ChangeSet-1 will be added to notUploaded, 
and it will remain there until ChangeSet-4 triggers pre-emptive upload and is 
uploaded to the same file with ChangeSet-4.

 

@ [~Yanfei Lei] 


was (Author: feifan wang):
The updated _FsStateChangelogWriterTest#testChangelogFileNotFound1_ can be 
described by the following figure :

!image-2023-02-01-16-57-43-889.png|width=697,height=326!

When materialization-1 is triggered, ChangeSet-1 will be added to notUploaded, 
and it will remain there until ChangeSet-4 triggers pre-emptive upload and is 
uploaded to the same file with ChangeSet-4.

 

@ [~Yanfei Lei] 

> EventTimeWindowCheckpointingITCase failed with restore
> --
>
> Key: FLINK-28440
> URL: https://issues.apache.org/jira/browse/FLINK-28440
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Yanfei Lei
>Priority: Critical
>  Labels: auto-deprioritized-critical, pull-request-available, 
> test-stability
> Attachments: image-2023-02-01-00-51-54-506.png, 
> image-2023-02-01-01-10-01-521.png, image-2023-02-01-01-19-12-182.png, 
> image-2023-02-01-16-47-23-756.png, image-2023-02-01-16-57-43-889.png, 
> image-2023-02-02-10-52-56-599.png
>
>
> {code:java}
> Caused by: java.lang.Exception: Exception while creating 
> StreamOperatorStateContext.
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:256)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:722)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:698)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:665)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for WindowOperator_0a448493b4782967b150582570326227_(2/4) from 
> any of the 1 provided restore options.
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:353)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:165)
>   ... 11 more
> Caused by: java.lang.RuntimeException: java.io.FileNotFoundException: 
> /tmp/junit1835099326935900400/junit1113650082510421526/52ee65b7-033f-4429-8ddd-adbe85e27ced
>  (No such file or directory)
>   at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
>   at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:87)
>   at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.hasNext(StateChangelogHandleStreamHandleReader.java:69)
>   at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.readBackendHandle(ChangelogBackendRestoreOperation.java:96)
>   at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:75)
>   at 
> org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:92)
>   at 
> org.apache.flink.state.changelog.AbstractChangelogStateBackend.createKeyedStateBa

[jira] [Comment Edited] (FLINK-28440) EventTimeWindowCheckpointingITCase failed with restore

2023-02-01 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682912#comment-17682912
 ] 

Feifan Wang edited comment on FLINK-28440 at 2/1/23 11:34 AM:
--

The updated _FsStateChangelogWriterTest#testChangelogFileNotFound1_ can be 
described by the following figure :

!image-2023-02-01-16-57-43-889.png|width=697,height=326!

When materialization-1 is triggered, ChangeSet-1 will be added to notUploaded, 
and it will remain there until ChangeSet-4 triggers pre-emptive upload and is 
uploaded to the same file with ChangeSet-4.

 

@ [~Yanfei Lei] 


was (Author: feifan wang):
The updated _FsStateChangelogWriterTest#testChangelogFileNotFound1_ can be 
described by the following figure :

!image-2023-02-01-16-57-43-889.png|width=697,height=326!

 

@ [~Yanfei Lei] 

> EventTimeWindowCheckpointingITCase failed with restore
> --
>
> Key: FLINK-28440
> URL: https://issues.apache.org/jira/browse/FLINK-28440
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Yanfei Lei
>Priority: Critical
>  Labels: auto-deprioritized-critical, pull-request-available, 
> test-stability
> Attachments: image-2023-02-01-00-51-54-506.png, 
> image-2023-02-01-01-10-01-521.png, image-2023-02-01-01-19-12-182.png, 
> image-2023-02-01-16-47-23-756.png, image-2023-02-01-16-57-43-889.png
>
>
> {code:java}
> Caused by: java.lang.Exception: Exception while creating 
> StreamOperatorStateContext.
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:256)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:722)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:698)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:665)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for WindowOperator_0a448493b4782967b150582570326227_(2/4) from 
> any of the 1 provided restore options.
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:353)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:165)
>   ... 11 more
> Caused by: java.lang.RuntimeException: java.io.FileNotFoundException: 
> /tmp/junit1835099326935900400/junit1113650082510421526/52ee65b7-033f-4429-8ddd-adbe85e27ced
>  (No such file or directory)
>   at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
>   at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:87)
>   at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.hasNext(StateChangelogHandleStreamHandleReader.java:69)
>   at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.readBackendHandle(ChangelogBackendRestoreOperation.java:96)
>   at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:75)
>   at 
> org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:92)
>   at 
> org.apache.flink.state.changelog.AbstractChangelogStateBackend.createKeyedStateBackend(AbstractChangelogStateBackend.java:136)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:336)
>   at 
> org.apache.flink.streamin

[jira] [Comment Edited] (FLINK-28440) EventTimeWindowCheckpointingITCase failed with restore

2023-02-01 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682912#comment-17682912
 ] 

Feifan Wang edited comment on FLINK-28440 at 2/1/23 8:59 AM:
-

The updated _FsStateChangelogWriterTest#testChangelogFileNotFound1_ can be 
described by the following figure :

!image-2023-02-01-16-57-43-889.png|width=697,height=326!

 

@ [~Yanfei Lei] 


was (Author: feifan wang):
The updated _FsStateChangelogWriterTest#testChangelogFileNotFound1_ can be 
described by the following figure :

!image-2023-02-01-16-47-23-756.png|width=715,height=331!

 

[~Yanfei Lei] 

> EventTimeWindowCheckpointingITCase failed with restore
> --
>
> Key: FLINK-28440
> URL: https://issues.apache.org/jira/browse/FLINK-28440
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Yanfei Lei
>Priority: Critical
>  Labels: auto-deprioritized-critical, pull-request-available, 
> test-stability
> Attachments: image-2023-02-01-00-51-54-506.png, 
> image-2023-02-01-01-10-01-521.png, image-2023-02-01-01-19-12-182.png, 
> image-2023-02-01-16-47-23-756.png, image-2023-02-01-16-57-43-889.png
>
>
> {code:java}
> Caused by: java.lang.Exception: Exception while creating 
> StreamOperatorStateContext.
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:256)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:722)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:698)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:665)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for WindowOperator_0a448493b4782967b150582570326227_(2/4) from 
> any of the 1 provided restore options.
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:353)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:165)
>   ... 11 more
> Caused by: java.lang.RuntimeException: java.io.FileNotFoundException: 
> /tmp/junit1835099326935900400/junit1113650082510421526/52ee65b7-033f-4429-8ddd-adbe85e27ced
>  (No such file or directory)
>   at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
>   at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:87)
>   at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.hasNext(StateChangelogHandleStreamHandleReader.java:69)
>   at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.readBackendHandle(ChangelogBackendRestoreOperation.java:96)
>   at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:75)
>   at 
> org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:92)
>   at 
> org.apache.flink.state.changelog.AbstractChangelogStateBackend.createKeyedStateBackend(AbstractChangelogStateBackend.java:136)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:336)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRest

[jira] [Comment Edited] (FLINK-28440) EventTimeWindowCheckpointingITCase failed with restore

2023-02-01 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682912#comment-17682912
 ] 

Feifan Wang edited comment on FLINK-28440 at 2/1/23 8:48 AM:
-

The updated _FsStateChangelogWriterTest#testChangelogFileNotFound1_ can be 
described by the following figure :

!image-2023-02-01-16-47-23-756.png|width=715,height=331!

 

[~Yanfei Lei] 


was (Author: feifan wang):
The updated _FsStateChangelogWriterTest#testChangelogFileNotFound1_ can be 
described by the following figure :

!image-2023-02-01-16-47-23-756.png|width=715,height=331!

> EventTimeWindowCheckpointingITCase failed with restore
> --
>
> Key: FLINK-28440
> URL: https://issues.apache.org/jira/browse/FLINK-28440
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Yanfei Lei
>Priority: Critical
>  Labels: auto-deprioritized-critical, pull-request-available, 
> test-stability
> Attachments: image-2023-02-01-00-51-54-506.png, 
> image-2023-02-01-01-10-01-521.png, image-2023-02-01-01-19-12-182.png, 
> image-2023-02-01-16-47-23-756.png
>
>
> {code:java}
> Caused by: java.lang.Exception: Exception while creating 
> StreamOperatorStateContext.
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:256)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:722)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:698)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:665)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for WindowOperator_0a448493b4782967b150582570326227_(2/4) from 
> any of the 1 provided restore options.
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:353)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:165)
>   ... 11 more
> Caused by: java.lang.RuntimeException: java.io.FileNotFoundException: 
> /tmp/junit1835099326935900400/junit1113650082510421526/52ee65b7-033f-4429-8ddd-adbe85e27ced
>  (No such file or directory)
>   at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
>   at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:87)
>   at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.hasNext(StateChangelogHandleStreamHandleReader.java:69)
>   at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.readBackendHandle(ChangelogBackendRestoreOperation.java:96)
>   at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:75)
>   at 
> org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:92)
>   at 
> org.apache.flink.state.changelog.AbstractChangelogStateBackend.createKeyedStateBackend(AbstractChangelogStateBackend.java:136)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:336)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>   ... 13 more
> Caused by:

[jira] [Updated] (FLINK-28440) EventTimeWindowCheckpointingITCase failed with restore

2023-02-01 Thread Feifan Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feifan Wang updated FLINK-28440:

Attachment: image-2023-02-01-16-47-23-756.png

> EventTimeWindowCheckpointingITCase failed with restore
> --
>
> Key: FLINK-28440
> URL: https://issues.apache.org/jira/browse/FLINK-28440
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Yanfei Lei
>Priority: Critical
>  Labels: auto-deprioritized-critical, pull-request-available, 
> test-stability
> Attachments: image-2023-02-01-00-51-54-506.png, 
> image-2023-02-01-01-10-01-521.png, image-2023-02-01-01-19-12-182.png, 
> image-2023-02-01-16-47-23-756.png
>
>
> {code:java}
> Caused by: java.lang.Exception: Exception while creating 
> StreamOperatorStateContext.
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:256)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:722)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:698)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:665)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for WindowOperator_0a448493b4782967b150582570326227_(2/4) from 
> any of the 1 provided restore options.
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:353)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:165)
>   ... 11 more
> Caused by: java.lang.RuntimeException: java.io.FileNotFoundException: 
> /tmp/junit1835099326935900400/junit1113650082510421526/52ee65b7-033f-4429-8ddd-adbe85e27ced
>  (No such file or directory)
>   at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
>   at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:87)
>   at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.hasNext(StateChangelogHandleStreamHandleReader.java:69)
>   at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.readBackendHandle(ChangelogBackendRestoreOperation.java:96)
>   at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:75)
>   at 
> org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:92)
>   at 
> org.apache.flink.state.changelog.AbstractChangelogStateBackend.createKeyedStateBackend(AbstractChangelogStateBackend.java:136)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:336)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>   ... 13 more
> Caused by: java.io.FileNotFoundException: 
> /tmp/junit1835099326935900400/junit1113650082510421526/52ee65b7-033f-4429-8ddd-adbe85e27ced
>  (No such file or directory)
>   at java.io.FileInputStream.open0(Native Method)
>   at java.io.FileInputStream.open(FileInputStream.java:195)
>   at java.io.FileInputStream.(FileInputStream.java:138)
>   at 
> org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50)
>   

[jira] [Commented] (FLINK-28440) EventTimeWindowCheckpointingITCase failed with restore

2023-02-01 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682912#comment-17682912
 ] 

Feifan Wang commented on FLINK-28440:
-

The updated _FsStateChangelogWriterTest#testChangelogFileNotFound1_ can be 
described by the following figure :

!image-2023-02-01-16-47-23-756.png|width=715,height=331!

> EventTimeWindowCheckpointingITCase failed with restore
> --
>
> Key: FLINK-28440
> URL: https://issues.apache.org/jira/browse/FLINK-28440
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Yanfei Lei
>Priority: Critical
>  Labels: auto-deprioritized-critical, pull-request-available, 
> test-stability
> Attachments: image-2023-02-01-00-51-54-506.png, 
> image-2023-02-01-01-10-01-521.png, image-2023-02-01-01-19-12-182.png, 
> image-2023-02-01-16-47-23-756.png
>
>
> {code:java}
> Caused by: java.lang.Exception: Exception while creating 
> StreamOperatorStateContext.
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:256)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:722)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:698)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:665)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for WindowOperator_0a448493b4782967b150582570326227_(2/4) from 
> any of the 1 provided restore options.
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:353)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:165)
>   ... 11 more
> Caused by: java.lang.RuntimeException: java.io.FileNotFoundException: 
> /tmp/junit1835099326935900400/junit1113650082510421526/52ee65b7-033f-4429-8ddd-adbe85e27ced
>  (No such file or directory)
>   at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
>   at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:87)
>   at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.hasNext(StateChangelogHandleStreamHandleReader.java:69)
>   at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.readBackendHandle(ChangelogBackendRestoreOperation.java:96)
>   at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:75)
>   at 
> org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:92)
>   at 
> org.apache.flink.state.changelog.AbstractChangelogStateBackend.createKeyedStateBackend(AbstractChangelogStateBackend.java:136)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:336)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>   ... 13 more
> Caused by: java.io.FileNotFoundException: 
> /tmp/junit1835099326935900400/junit1113650082510421526/52ee65b7-033f-4429-8ddd-adbe85e27ced
>  (No such file or directory)
>   at java.io.FileInputStream.open0(Native Method)
>   at java.io.FileInputStream.open(FileInputSt

[jira] [Updated] (FLINK-28440) EventTimeWindowCheckpointingITCase failed with restore

2023-02-01 Thread Feifan Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feifan Wang updated FLINK-28440:

Attachment: image-2023-02-01-16-46-19-254.png

> EventTimeWindowCheckpointingITCase failed with restore
> --
>
> Key: FLINK-28440
> URL: https://issues.apache.org/jira/browse/FLINK-28440
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Yanfei Lei
>Priority: Critical
>  Labels: auto-deprioritized-critical, pull-request-available, 
> test-stability
> Attachments: image-2023-02-01-00-51-54-506.png, 
> image-2023-02-01-01-10-01-521.png, image-2023-02-01-01-19-12-182.png
>
>
> {code:java}
> Caused by: java.lang.Exception: Exception while creating 
> StreamOperatorStateContext.
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:256)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:722)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:698)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:665)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for WindowOperator_0a448493b4782967b150582570326227_(2/4) from 
> any of the 1 provided restore options.
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:353)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:165)
>   ... 11 more
> Caused by: java.lang.RuntimeException: java.io.FileNotFoundException: 
> /tmp/junit1835099326935900400/junit1113650082510421526/52ee65b7-033f-4429-8ddd-adbe85e27ced
>  (No such file or directory)
>   at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
>   at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:87)
>   at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.hasNext(StateChangelogHandleStreamHandleReader.java:69)
>   at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.readBackendHandle(ChangelogBackendRestoreOperation.java:96)
>   at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:75)
>   at 
> org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:92)
>   at 
> org.apache.flink.state.changelog.AbstractChangelogStateBackend.createKeyedStateBackend(AbstractChangelogStateBackend.java:136)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:336)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>   ... 13 more
> Caused by: java.io.FileNotFoundException: 
> /tmp/junit1835099326935900400/junit1113650082510421526/52ee65b7-033f-4429-8ddd-adbe85e27ced
>  (No such file or directory)
>   at java.io.FileInputStream.open0(Native Method)
>   at java.io.FileInputStream.open(FileInputStream.java:195)
>   at java.io.FileInputStream.(FileInputStream.java:138)
>   at 
> org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50)
>   at 
> org.apache.flink.core.fs.loc

[jira] [Updated] (FLINK-28440) EventTimeWindowCheckpointingITCase failed with restore

2023-02-01 Thread Feifan Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feifan Wang updated FLINK-28440:

Attachment: (was: image-2023-02-01-16-46-19-254.png)

> EventTimeWindowCheckpointingITCase failed with restore
> --
>
> Key: FLINK-28440
> URL: https://issues.apache.org/jira/browse/FLINK-28440
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Yanfei Lei
>Priority: Critical
>  Labels: auto-deprioritized-critical, pull-request-available, 
> test-stability
> Attachments: image-2023-02-01-00-51-54-506.png, 
> image-2023-02-01-01-10-01-521.png, image-2023-02-01-01-19-12-182.png
>
>
> {code:java}
> Caused by: java.lang.Exception: Exception while creating 
> StreamOperatorStateContext.
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:256)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:722)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:698)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:665)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for WindowOperator_0a448493b4782967b150582570326227_(2/4) from 
> any of the 1 provided restore options.
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:353)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:165)
>   ... 11 more
> Caused by: java.lang.RuntimeException: java.io.FileNotFoundException: 
> /tmp/junit1835099326935900400/junit1113650082510421526/52ee65b7-033f-4429-8ddd-adbe85e27ced
>  (No such file or directory)
>   at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
>   at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:87)
>   at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.hasNext(StateChangelogHandleStreamHandleReader.java:69)
>   at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.readBackendHandle(ChangelogBackendRestoreOperation.java:96)
>   at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:75)
>   at 
> org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:92)
>   at 
> org.apache.flink.state.changelog.AbstractChangelogStateBackend.createKeyedStateBackend(AbstractChangelogStateBackend.java:136)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:336)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>   ... 13 more
> Caused by: java.io.FileNotFoundException: 
> /tmp/junit1835099326935900400/junit1113650082510421526/52ee65b7-033f-4429-8ddd-adbe85e27ced
>  (No such file or directory)
>   at java.io.FileInputStream.open0(Native Method)
>   at java.io.FileInputStream.open(FileInputStream.java:195)
>   at java.io.FileInputStream.(FileInputStream.java:138)
>   at 
> org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50)
>   at 
> org.apache.flink.

[jira] [Commented] (FLINK-28440) EventTimeWindowCheckpointingITCase failed with restore

2023-01-31 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682896#comment-17682896
 ] 

Feifan Wang commented on FLINK-28440:
-

[~Yanfei Lei] , Thank you for your reminder, the previous description and test 
did not consider stopTracking(). But even if stopTracking() is considered, the 
problem I mentioned still exists. I updated the 
_FsStateChangelogWriterTest#testChangelogFileNotFound1_ , you can have a look.

> EventTimeWindowCheckpointingITCase failed with restore
> --
>
> Key: FLINK-28440
> URL: https://issues.apache.org/jira/browse/FLINK-28440
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Yanfei Lei
>Priority: Critical
>  Labels: auto-deprioritized-critical, pull-request-available, 
> test-stability
> Attachments: image-2023-02-01-00-51-54-506.png, 
> image-2023-02-01-01-10-01-521.png, image-2023-02-01-01-19-12-182.png
>
>
> {code:java}
> Caused by: java.lang.Exception: Exception while creating 
> StreamOperatorStateContext.
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:256)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:722)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:698)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:665)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for WindowOperator_0a448493b4782967b150582570326227_(2/4) from 
> any of the 1 provided restore options.
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:353)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:165)
>   ... 11 more
> Caused by: java.lang.RuntimeException: java.io.FileNotFoundException: 
> /tmp/junit1835099326935900400/junit1113650082510421526/52ee65b7-033f-4429-8ddd-adbe85e27ced
>  (No such file or directory)
>   at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
>   at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:87)
>   at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.hasNext(StateChangelogHandleStreamHandleReader.java:69)
>   at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.readBackendHandle(ChangelogBackendRestoreOperation.java:96)
>   at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:75)
>   at 
> org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:92)
>   at 
> org.apache.flink.state.changelog.AbstractChangelogStateBackend.createKeyedStateBackend(AbstractChangelogStateBackend.java:136)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:336)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>   ... 13 more
> Caused by: java.io.FileNotFoundException: 
> /tmp/junit1835099326935900400/junit1113650082510421526/52ee65b7-033f-4429-8ddd-adbe85e27ced
>  (No such file or directory)
>   at java.io.FileInpu

  1   2   3   4   >