[jira] [Updated] (FLINK-25528) state processor api do not support increment checkpoint
[ https://issues.apache.org/jira/browse/FLINK-25528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang updated FLINK-25528: - Fix Version/s: 1.16.0 > state processor api do not support increment checkpoint > --- > > Key: FLINK-25528 > URL: https://issues.apache.org/jira/browse/FLINK-25528 > Project: Flink > Issue Type: Improvement > Components: API / State Processor, Runtime / State Backends >Reporter: 刘方奇 >Assignee: 刘方奇 >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > > As the title, in the state-processor-api, we use the savepoint opition to > snapshot state defaultly in org.apache.flink.state.api.output.SnapshotUtils. > But in many cases, we maybe need to snapshot state incremently which have > better performance than savepoint. > Shall we add the config to chose the checkpoint way just like flink stream > backend? -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-26050) Too many small sst files in rocksdb state backend when using processing time window
[ https://issues.apache.org/jira/browse/FLINK-26050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17495303#comment-17495303 ] Yun Tang commented on FLINK-26050: -- [~shenjiaqi] Since Flink has disabled the WAL, and the suggestion of configuring {{log_size_for_flush}} should not work here. It seems a limit of RocksDB level compaction, and I suggest to store those timers on heap considering not so many timers and it works fine in many cases. > Too many small sst files in rocksdb state backend when using processing time > window > --- > > Key: FLINK-26050 > URL: https://issues.apache.org/jira/browse/FLINK-26050 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.10.2, 1.14.3 >Reporter: shen >Priority: Major > Attachments: image-2022-02-09-21-22-13-920.png, > image-2022-02-11-10-32-14-956.png, image-2022-02-11-10-36-46-630.png, > image-2022-02-14-13-04-52-325.png > > > When using processing time window, in some workload, there will be a lot of > small sst files(serveral KB) in rocksdb local directory and may cause "Too > many files error". > Use rocksdb tool ldb to find out content in sst files: > * column family of these small sst files is "processing_window-timers". > * most sst files are in level-1. > * records in sst files are almost kTypeDeletion. > * creation time of sst file correspond to checkpoint interval. > These small sst files seem to be generated when flink checkpoint is > triggered. Although all content in sst are delete tags, they are not > compacted and deleted in rocksdb compaction because of not intersecting with > each other(rocksdb [compaction trivial > move|https://github.com/facebook/rocksdb/wiki/Compaction-Trivial-Move]). And > there seems to be no chance to delete them because of small size and not > intersect with other sst files. > > I will attach a simple program to reproduce the problem. > > Since timer in processing time window is generated in strictly ascending > order(both put and delete). So If workload of job happen to generate level-0 > sst files not intersect with each other.(for example: processing window size > much smaller than checkpoint interval, and no window content cross checkpoint > interval or no new data in window crossing checkpoint interval). There will > be many small sst files generated until job restored from savepoint, or > incremental checkpoint is disabled. > > May be similar problem exists when user use timer in operators with same > workload. > > Code to reproduce the problem: > {code:java} > package org.apache.flink.jira; > import lombok.extern.slf4j.Slf4j; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.configuration.RestOptions; > import org.apache.flink.configuration.TaskManagerOptions; > import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; > import org.apache.flink.streaming.api.TimeCharacteristic; > import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; > import org.apache.flink.streaming.api.datastream.DataStreamSource; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.api.functions.source.SourceFunction; > import > org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; > import > org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; > import org.apache.flink.streaming.api.windowing.time.Time; > import org.apache.flink.streaming.api.windowing.windows.TimeWindow; > import org.apache.flink.util.Collector; > import java.util.Collections; > import java.util.List; > import java.util.Random; > @Slf4j > public class StreamApp { > public static void main(String[] args) throws Exception { > Configuration config = new Configuration(); > config.set(RestOptions.ADDRESS, "127.0.0.1"); > config.set(RestOptions.PORT, 10086); > config.set(TaskManagerOptions.NUM_TASK_SLOTS, 6); > new > StreamApp().configureApp(StreamExecutionEnvironment.createLocalEnvironment(1, > config)); > } > public void configureApp(StreamExecutionEnvironment env) throws Exception { > env.enableCheckpointing(2); // 20sec > RocksDBStateBackend rocksDBStateBackend = > new > RocksDBStateBackend("file:///Users/shenjiaqi/Workspace/jira/flink-51/checkpoints/", > true); // need to be reconfigured > > rocksDBStateBackend.setDbStoragePath("/Users/shenjiaqi/Workspace/jira/flink-51/flink/rocksdb_local_db"); > // need to be reconfigured > env.setStateBackend(rocksDBStateBackend); > env.getCheckpointConfig().setCheckpointTimeout(10); > env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5); > env.setParallelism(1)
[jira] [Commented] (FLINK-26196) error when Incremental Checkpoints by RocksDb
[ https://issues.apache.org/jira/browse/FLINK-26196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17495296#comment-17495296 ] Yun Tang commented on FLINK-26196: -- [~hjw], apart from more logs. What kind of environment did you use? Docker image on ECS, or yarn container on linux machine? > error when Incremental Checkpoints by RocksDb > --- > > Key: FLINK-26196 > URL: https://issues.apache.org/jira/browse/FLINK-26196 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / State Backends >Affects Versions: 1.13.2 >Reporter: hjw >Priority: Critical > > When I use Incremental Checkpoints by RocksDb , errors happen occasionally. > Fortunately,Flink job is running normally > Log: > {code:java} > java.io.IOException: Could not perform checkpoint 2804 for operator > cc-rule-keyByAndReduceStream (2/8)#1. > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1045) > at > org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:135) > at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:250) > at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:61) > at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:431) > at > org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61) > at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:227) > at > org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180) > at > org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not > complete snapshot 2804 for operator cc-rule-keyByAndReduceStream (2/8)#1. > Failure reason: Checkpoint was declined. > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:264) > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:169) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:706) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:627) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:590) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:312) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1089) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1073) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1029) > ... 19 more > Caused by: org.roc
[jira] [Commented] (FLINK-25528) state processor api do not support increment checkpoint
[ https://issues.apache.org/jira/browse/FLINK-25528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17494454#comment-17494454 ] Yun Tang commented on FLINK-25528: -- [~liufangqi] Thanks for you explaination. If current state processor API cannot write native checkpoint, I think we could make a full native checkpoint instead of a savepoint for state processor API writing. This deserves a patch of improvement. BTW, I think you can refer to https://issues.apache.org/jira/browse/FLINK-25276 to change your PR. > state processor api do not support increment checkpoint > --- > > Key: FLINK-25528 > URL: https://issues.apache.org/jira/browse/FLINK-25528 > Project: Flink > Issue Type: Improvement > Components: API / State Processor, Runtime / State Backends >Reporter: 刘方奇 >Priority: Major > > As the title, in the state-processor-api, we use the savepoint opition to > snapshot state defaultly in org.apache.flink.state.api.output.SnapshotUtils. > But in many cases, we maybe need to snapshot state incremently which have > better performance than savepoint. > Shall we add the config to chose the checkpoint way just like flink stream > backend? -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25528) state processor api do not support increment checkpoint
[ https://issues.apache.org/jira/browse/FLINK-25528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17494345#comment-17494345 ] Yun Tang commented on FLINK-25528: -- [~liufangqi] I don't understand the title. Actually, state processor could work with incremental checkpoint on reading and writing. Have you ever tried to load a incremental checkpoint (the loading checkpoint path should be {{path/chk-x}}) ? > state processor api do not support increment checkpoint > --- > > Key: FLINK-25528 > URL: https://issues.apache.org/jira/browse/FLINK-25528 > Project: Flink > Issue Type: Improvement > Components: API / State Processor, Runtime / State Backends >Reporter: 刘方奇 >Priority: Major > > As the title, in the state-processor-api, we use the savepoint opition to > snapshot state defaultly in org.apache.flink.state.api.output.SnapshotUtils. > But in many cases, we maybe need to snapshot state incremently which have > better performance than savepoint. > Shall we add the config to chose the checkpoint way just like flink stream > backend? -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Closed] (FLINK-26170) checkpoint time too long
[ https://issues.apache.org/jira/browse/FLINK-26170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang closed FLINK-26170. Resolution: Information Provided > checkpoint time too long > --- > > Key: FLINK-26170 > URL: https://issues.apache.org/jira/browse/FLINK-26170 > Project: Flink > Issue Type: Bug >Affects Versions: 1.14.3 >Reporter: zhangqw152 >Priority: Major > Attachments: image-2022-02-16-09-46-42-675.png, > image-2022-02-16-09-54-48-482.png > > > !image-2022-02-16-09-46-42-675.png! > !image-2022-02-16-09-54-48-482.png! -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-26170) checkpoint time too long
[ https://issues.apache.org/jira/browse/FLINK-26170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17492972#comment-17492972 ] Yun Tang commented on FLINK-26170: -- [~zhangqw152], the checkpoint duration dependens on many possible reasons, maybe job is backpressured, some customized operator hangs at checkpoint sync duration. The information you provided cannot help anything. Moreover, user mailing is a better place to ask why checkpoint expired. Since current ticket description cannot provided any useful information, I will close this ticket. And if we could identify that this is a really bug, we can then create a specific ticket target to that bug. > checkpoint time too long > --- > > Key: FLINK-26170 > URL: https://issues.apache.org/jira/browse/FLINK-26170 > Project: Flink > Issue Type: Bug >Affects Versions: 1.14.3 >Reporter: zhangqw152 >Priority: Major > Attachments: image-2022-02-16-09-46-42-675.png, > image-2022-02-16-09-54-48-482.png > > > !image-2022-02-16-09-46-42-675.png! > !image-2022-02-16-09-54-48-482.png! -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-26170) checkpoint time too long
[ https://issues.apache.org/jira/browse/FLINK-26170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang updated FLINK-26170: - Priority: Major (was: Critical) > checkpoint time too long > --- > > Key: FLINK-26170 > URL: https://issues.apache.org/jira/browse/FLINK-26170 > Project: Flink > Issue Type: Bug >Affects Versions: 1.14.3 >Reporter: zhangqw152 >Priority: Major > Attachments: image-2022-02-16-09-46-42-675.png, > image-2022-02-16-09-54-48-482.png > > > !image-2022-02-16-09-46-42-675.png! > !image-2022-02-16-09-54-48-482.png! -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (FLINK-26101) Avoid shared state registry to discard multi-registered identical changelog state
[ https://issues.apache.org/jira/browse/FLINK-26101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang resolved FLINK-26101. -- Resolution: Fixed Merged in master: 8b25ae64794727e49de66d0b61e0955da7f21961 > Avoid shared state registry to discard multi-registered identical changelog > state > - > > Key: FLINK-26101 > URL: https://issues.apache.org/jira/browse/FLINK-26101 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > Under change-log state backend, we will register same materialized keyed > state handle multi times, and {{SharedStateRegistryImpl}} will discard the > duplicated state handle. > {code:java} > if (!Objects.equals(state, entry.stateHandle)) { > if (entry.confirmed || isPlaceholder(state)) { > scheduledStateDeletion = state; > } else { > // Old entry is not in a confirmed checkpoint yet, and the new one > differs. > // This might result from (omitted KG range here for simplicity): > // 1. Flink recovers from a failure using a checkpoint 1 > // 2. State Backend is initialized to UID xyz and a set of SST: { > 01.sst } > // 3. JM triggers checkpoint 2 > // 4. TM sends handle: "xyz-002.sst"; JM registers it under > "xyz-002.sst" > // 5. TM crashes; everything is repeated from (2) > // 6. TM recovers from CP 1 again: backend UID "xyz", SST { 01.sst } > // 7. JM triggers checkpoint 3 > // 8. TM sends NEW state "xyz-002.sst" > // 9. JM discards it as duplicate > // 10. checkpoint completes, but a wrong SST file is used > // So we use a new entry and discard the old one: > scheduledStateDeletion = entry.stateHandle; > entry.stateHandle = state; > } > {code} > Thus, we need to implement the {{#equals}} method for the registered state > handles. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-26101) Avoid shared state registry to discard multi-registered identical changelog state
[ https://issues.apache.org/jira/browse/FLINK-26101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang updated FLINK-26101: - Summary: Avoid shared state registry to discard multi-registered identical changelog state (was: Avoid shared state registry to discard multi-registered changelog state) > Avoid shared state registry to discard multi-registered identical changelog > state > - > > Key: FLINK-26101 > URL: https://issues.apache.org/jira/browse/FLINK-26101 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Major > Fix For: 1.15.0 > > > Under change-log state backend, we will register same materialized keyed > state handle multi times, and {{SharedStateRegistryImpl}} will discard the > duplicated state handle. > {code:java} > if (!Objects.equals(state, entry.stateHandle)) { > if (entry.confirmed || isPlaceholder(state)) { > scheduledStateDeletion = state; > } else { > // Old entry is not in a confirmed checkpoint yet, and the new one > differs. > // This might result from (omitted KG range here for simplicity): > // 1. Flink recovers from a failure using a checkpoint 1 > // 2. State Backend is initialized to UID xyz and a set of SST: { > 01.sst } > // 3. JM triggers checkpoint 2 > // 4. TM sends handle: "xyz-002.sst"; JM registers it under > "xyz-002.sst" > // 5. TM crashes; everything is repeated from (2) > // 6. TM recovers from CP 1 again: backend UID "xyz", SST { 01.sst } > // 7. JM triggers checkpoint 3 > // 8. TM sends NEW state "xyz-002.sst" > // 9. JM discards it as duplicate > // 10. checkpoint completes, but a wrong SST file is used > // So we use a new entry and discard the old one: > scheduledStateDeletion = entry.stateHandle; > entry.stateHandle = state; > } > {code} > Thus, we need to implement the {{#equals}} method for the registered state > handles. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-26101) Avoid shared state registry to discard multi-registered changelog state
[ https://issues.apache.org/jira/browse/FLINK-26101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang updated FLINK-26101: - Summary: Avoid shared state registry to discard multi-registered changelog state (was: Avoid shared state registry to discard duplicate changelog state) > Avoid shared state registry to discard multi-registered changelog state > --- > > Key: FLINK-26101 > URL: https://issues.apache.org/jira/browse/FLINK-26101 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Major > Fix For: 1.15.0 > > > Under change-log state backend, we will register same materialized keyed > state handle multi times, and {{SharedStateRegistryImpl}} will discard the > duplicated state handle. > {code:java} > if (!Objects.equals(state, entry.stateHandle)) { > if (entry.confirmed || isPlaceholder(state)) { > scheduledStateDeletion = state; > } else { > // Old entry is not in a confirmed checkpoint yet, and the new one > differs. > // This might result from (omitted KG range here for simplicity): > // 1. Flink recovers from a failure using a checkpoint 1 > // 2. State Backend is initialized to UID xyz and a set of SST: { > 01.sst } > // 3. JM triggers checkpoint 2 > // 4. TM sends handle: "xyz-002.sst"; JM registers it under > "xyz-002.sst" > // 5. TM crashes; everything is repeated from (2) > // 6. TM recovers from CP 1 again: backend UID "xyz", SST { 01.sst } > // 7. JM triggers checkpoint 3 > // 8. TM sends NEW state "xyz-002.sst" > // 9. JM discards it as duplicate > // 10. checkpoint completes, but a wrong SST file is used > // So we use a new entry and discard the old one: > scheduledStateDeletion = entry.stateHandle; > entry.stateHandle = state; > } > {code} > Thus, we need to implement the {{#equals}} method for the registered state > handles. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26101) Avoid shared state registry to discard duplicate changelog state
Yun Tang created FLINK-26101: Summary: Avoid shared state registry to discard duplicate changelog state Key: FLINK-26101 URL: https://issues.apache.org/jira/browse/FLINK-26101 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Reporter: Yun Tang Assignee: Yun Tang Fix For: 1.15.0 Under change-log state backend, we will register same materialized keyed state handle multi times, and {{SharedStateRegistryImpl}} will discard the duplicated state handle. {code:java} if (!Objects.equals(state, entry.stateHandle)) { if (entry.confirmed || isPlaceholder(state)) { scheduledStateDeletion = state; } else { // Old entry is not in a confirmed checkpoint yet, and the new one differs. // This might result from (omitted KG range here for simplicity): // 1. Flink recovers from a failure using a checkpoint 1 // 2. State Backend is initialized to UID xyz and a set of SST: { 01.sst } // 3. JM triggers checkpoint 2 // 4. TM sends handle: "xyz-002.sst"; JM registers it under "xyz-002.sst" // 5. TM crashes; everything is repeated from (2) // 6. TM recovers from CP 1 again: backend UID "xyz", SST { 01.sst } // 7. JM triggers checkpoint 3 // 8. TM sends NEW state "xyz-002.sst" // 9. JM discards it as duplicate // 10. checkpoint completes, but a wrong SST file is used // So we use a new entry and discard the old one: scheduledStateDeletion = entry.stateHandle; entry.stateHandle = state; } {code} Thus, we need to implement the {{#equals}} method for the registered state handles. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (FLINK-25466) TTL configuration could parse in StateTtlConfig#DISABLED
[ https://issues.apache.org/jira/browse/FLINK-25466?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang resolved FLINK-25466. -- Resolution: Fixed merged master: c1c966568b50c03257428e018f3cacd88b7dd116 release-1.14: 30994a1788085034ed1b467a5df6253ee44b1da6 release-1.13: ab86ffa78a2952126d90a4d4fa3ce0b16b957991 > TTL configuration could parse in StateTtlConfig#DISABLED > > > Key: FLINK-25466 > URL: https://issues.apache.org/jira/browse/FLINK-25466 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0, 1.14.4, 1.13.7 > > > Current API \{{StateDescriptor#enableTimeToLive(StateTtlConfig)}} cannot > handle StateTtlConfig#DISABLED due to its current implementation. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (FLINK-25478) Same materialized state handle should not register multi times
[ https://issues.apache.org/jira/browse/FLINK-25478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang resolved FLINK-25478. -- Resolution: Fixed Merged in master: ae166080fd1458b6e761abcf18d9302a4dcefbcd and a519ed1b11e7bf085d3eee6c6f39cab5967269d0 > Same materialized state handle should not register multi times > -- > > Key: FLINK-25478 > URL: https://issues.apache.org/jira/browse/FLINK-25478 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Checkpointing, Runtime / State Backends >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Critical > Labels: pull-request-available > Fix For: 1.15.0 > > > Currently, changelog materialization would call RocksDB state backend's > snapshot method to generate {{IncrementalRemoteKeyedStateHandle}} as > ChangelogStateBackendHandleImpl's materialized artifacts. And before next > materialization, it will always report the same > {{IncrementalRemoteKeyedStateHandle}} as before. > It's fine to register this for the 1st time. However, for the 2nd time to > register {{IncrementalRemoteKeyedStateHandle}} (via > {{{}ChangelogStateBackendHandleImpl#registerSharedStates{}}}), it will > discard the private state artifacts without check the register reference: > IncrementalRemoteKeyedStateHandle: > {code:java} > public void discardState() throws Exception { > try { > StateUtil.bestEffortDiscardAllStateObjects(privateState.values()); > } catch (Exception e) { > LOG.warn("Could not properly discard misc file states.", e); > } > } > {code} > Thus, this would delete the private state (such as RocksDB's MAINFEST), and > once restore, job would not report FileNotFoundException. > > {code:java} > Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught > unexpected exception. > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:403) > at > org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:477) > > at > org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:90) > > at > org.apache.flink.state.changelog.ChangelogStateBackend.lambda$createKeyedStateBackend$1(ChangelogStateBackend.java:153) > at > org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:68) > at > org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:221) > > at > org.apache.flink.state.changelog.ChangelogStateBackend.createKeyedStateBackend(ChangelogStateBackend.java:145) > > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) > > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) > > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) > > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) > > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) > > ... 10 more > Caused by: java.io.FileNotFoundException: x > at > org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(PluginFileSystemFactory.java:127) > > at > org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68) > > at > org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:127) > > at > org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:110) > at > org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49) > > at > java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626) > ~[?:1.8.0_102] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147) > ~[?:1.8.0_102] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622) > ~[?:1.8.0_102] > ... 1 more {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25557) Introduce incremental/full checkpoint size stats
[ https://issues.apache.org/jira/browse/FLINK-25557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang updated FLINK-25557: - Release Note: Introduce metrics of persistent bytes within each checkpoint (via REST API and UI), which could help users to know how much data size had been persisted during the incremental or change-log based checkpoint. > Introduce incremental/full checkpoint size stats > > > Key: FLINK-25557 > URL: https://issues.apache.org/jira/browse/FLINK-25557 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Checkpointing, Runtime / Metrics, Runtime / > REST, Runtime / State Backends >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > Currently, the "checkpointed data size" would be incremental checkpoint size > if incremental checkpoint is enabled, otherwise full checkpoint size. This is > not friendly for users to know the exact incremental/full checkpoint size. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-26062) [Changelog] Non-deterministic recovery of PriorityQueue states
[ https://issues.apache.org/jira/browse/FLINK-26062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17489930#comment-17489930 ] Yun Tang commented on FLINK-26062: -- I think this is also true for InternalPriorityQueue#peek() operation as the order is non-deterministic on recovery. As peek is not a writing operation, changelog cannot record it. However, this might not affect the expected behaviors, right? > [Changelog] Non-deterministic recovery of PriorityQueue states > -- > > Key: FLINK-26062 > URL: https://issues.apache.org/jira/browse/FLINK-26062 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.15.0 >Reporter: Roman Khachatryan >Assignee: Roman Khachatryan >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > Currently, InternalPriorityQueue.poll() is logged as a separate operation, > without specifying the element that has been polled. On recovery, this > recorded poll() is replayed. > However, this is not deterministic because the order of PQ elements with > equal priorityis not specified. For example, TimerHeapInternalTimer only > compares timestamps, which are often equal. This results in polling timers > from queue in wrong order => dropping timers => and not firing timers. > > ProcessingTimeWindowCheckpointingITCase.testAggregatingSlidingProcessingTimeWindow > fails with materialization enabled and using heap state backend (both > in-memory and fs-based implementations). > > Proposed solution is to replace poll with remove operation (which is based on > equality). > > cc: [~masteryhx], [~ym], [~yunta] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25835) The task initialization duration is recorded in logs
[ https://issues.apache.org/jira/browse/FLINK-25835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17484354#comment-17484354 ] Yun Tang commented on FLINK-25835: -- Current state backend would print the time to restore the state handles, should that be enough? > The task initialization duration is recorded in logs > > > Key: FLINK-25835 > URL: https://issues.apache.org/jira/browse/FLINK-25835 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.12.2, 1.15.0 >Reporter: Bo Cui >Priority: Major > Labels: pull-request-available > > [https://github.com/apache/flink/blob/a543e658acfbc22c1579df0d043654037b9ec4b0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L644] > We are testing the time of state backend initialization for different data > levels.However, the task initialization time cannot be obtained from the log > file and the time taken to restore the status at the backend cannot be > obtained. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25862) Refactor SharedStateRegistry to not limit StreamStateHandle to register/unregister
[ https://issues.apache.org/jira/browse/FLINK-25862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang updated FLINK-25862: - Summary: Refactor SharedStateRegistry to not limit StreamStateHandle to register/unregister (was: Refactor SharedStateRegistry to limit StreamStateHandle to register/unregister) > Refactor SharedStateRegistry to not limit StreamStateHandle to > register/unregister > -- > > Key: FLINK-25862 > URL: https://issues.apache.org/jira/browse/FLINK-25862 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing, Runtime / State Backends >Reporter: Yun Tang >Priority: Major > > Current implementation of SharedStateRegistry would use `StreamStateHandle` > to register and unregister. This would limit the usage for other componments, > such as change-log state backend handle usage. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (FLINK-25767) Translation of page 'Working with State' is incomplete
[ https://issues.apache.org/jira/browse/FLINK-25767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang resolved FLINK-25767. -- Fix Version/s: 1.14.4 Resolution: Fixed Merged master: a424a9c7336c2e2c9de8406121efb5c0bb7257ea release-1.14: 517436abf607f2750e2c58b1fb659a86360ac43f > Translation of page 'Working with State' is incomplete > -- > > Key: FLINK-25767 > URL: https://issues.apache.org/jira/browse/FLINK-25767 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.14.3 >Reporter: Yao Zhang >Assignee: Yao Zhang >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0, 1.14.4 > > > The translation of page [Working with State | Apache > Flink|https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/datastream/fault-tolerance/state/] > is incomplete. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25872) Restoring from incremental checkpoint with changelog state-backend cannot work well with snapshot CLAIM mode
Yun Tang created FLINK-25872: Summary: Restoring from incremental checkpoint with changelog state-backend cannot work well with snapshot CLAIM mode Key: FLINK-25872 URL: https://issues.apache.org/jira/browse/FLINK-25872 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing, Runtime / State Backends Reporter: Yun Tang If we restore from incremental checkpoint with changelog state-backend enabled in snapshot CLAIM mode, the restored checkpoint would be discarded on subsume, which lead to the private state in Incremental state handle could be deleted by mistake. This bug is like FLINK-25478. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25862) Refactor SharedStateRegistry to limit StreamStateHandle to register/unregister
Yun Tang created FLINK-25862: Summary: Refactor SharedStateRegistry to limit StreamStateHandle to register/unregister Key: FLINK-25862 URL: https://issues.apache.org/jira/browse/FLINK-25862 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing, Runtime / State Backends Reporter: Yun Tang Current implementation of SharedStateRegistry would use `StreamStateHandle` to register and unregister. This would limit the usage for other componments, such as change-log state backend handle usage. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Reopened] (FLINK-25343) HBaseConnectorITCase.testTableSourceSinkWithDDL fail on azure
[ https://issues.apache.org/jira/browse/FLINK-25343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang reopened FLINK-25343: -- Reopen the issue as another instance happened: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=30251&view=logs&j=d44f43ce-542c-597d-bf94-b0718c71e5e8&t=ed165f3f-d0f6-524b-5279-86f8ee7d0e2d > HBaseConnectorITCase.testTableSourceSinkWithDDL fail on azure > - > > Key: FLINK-25343 > URL: https://issues.apache.org/jira/browse/FLINK-25343 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase >Reporter: Yun Gao >Assignee: Jing Ge >Priority: Minor > Labels: test-stability > > {code:java} > Dec 15 16:54:20 at > org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > Dec 15 16:54:20 at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > Dec 15 16:54:20 at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > Dec 15 16:54:20 at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > Dec 15 16:54:20 at > org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > Dec 15 16:54:20 at org.junit.rules.RunRules.evaluate(RunRules.java:20) > Dec 15 16:54:20 at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > Dec 15 16:54:20 at > org.junit.runners.ParentRunner.run(ParentRunner.java:413) > Dec 15 16:54:20 at > org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:367) > Dec 15 16:54:20 at > org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:274) > Dec 15 16:54:20 at > org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238) > Dec 15 16:54:20 at > org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:161) > Dec 15 16:54:20 at > org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:290) > Dec 15 16:54:20 at > org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:242) > Dec 15 16:54:20 at > org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:121) > Dec 15 16:54:20 > Dec 15 16:54:21 > Dec 15 16:54:21 Results : > Dec 15 16:54:21 > Dec 15 16:54:21 Failed tests: > Dec 15 16:54:21 HBaseConnectorITCase.testTableSourceSinkWithDDL:371 > expected:<8> but was:<3> > Dec 15 16:54:21 > Dec 15 16:54:21 Tests run: 9, Failures: 1, Errors: 0, Skipped: 0 > Dec 15 16:54:21 > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=28204&view=logs&j=d44f43ce-542c-597d-bf94-b0718c71e5e8&t=ed165f3f-d0f6-524b-5279-86f8ee7d0e2d&l=12375 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (FLINK-25557) Introduce incremental/full checkpoint size stats
[ https://issues.apache.org/jira/browse/FLINK-25557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang resolved FLINK-25557. -- Fix Version/s: 1.15.0 Resolution: Fixed merged in master: d34653c2fbdbf962ec5db07cadde2bc6db14b24f b2968c76a611d9e8479ef9acff8079f99b34ab12 ca519f68fa9e1db56daee9435be5e30530da1e70 > Introduce incremental/full checkpoint size stats > > > Key: FLINK-25557 > URL: https://issues.apache.org/jira/browse/FLINK-25557 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Checkpointing, Runtime / Metrics, Runtime / > REST, Runtime / State Backends >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > Currently, the "checkpointed data size" would be incremental checkpoint size > if incremental checkpoint is enabled, otherwise full checkpoint size. This is > not friendly for users to know the exact incremental/full checkpoint size. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25816) Changelog keyed state backend would come across NPE during notification
Yun Tang created FLINK-25816: Summary: Changelog keyed state backend would come across NPE during notification Key: FLINK-25816 URL: https://issues.apache.org/jira/browse/FLINK-25816 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing, Runtime / State Backends Reporter: Yun Tang Assignee: Yun Tang Fix For: 1.15.0 Instance: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=30158&view=logs&j=a57e0635-3fad-5b08-57c7-a4142d7d6fa9&t=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7 {code:java} Caused by: java.lang.NullPointerException at org.apache.flink.state.changelog.ChangelogKeyedStateBackend.notifyCheckpointAborted(ChangelogKeyedStateBackend.java:536) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.notifyCheckpointAborted(StreamOperatorStateHandler.java:298) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.notifyCheckpointAborted(AbstractStreamOperator.java:383) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointAborted(AbstractUdfStreamOperator.java:132) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointAborted(RegularOperatorChain.java:158) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:406) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointAborted(SubtaskCheckpointCoordinatorImpl.java:352) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointAbortAsync$15(StreamTask.java:1327) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1350) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:802) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:751) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.lang.Thread.run(Thread.java:748) {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-15507) Activate local recovery for RocksDB backends by default
[ https://issues.apache.org/jira/browse/FLINK-15507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17481203#comment-17481203 ] Yun Tang commented on FLINK-15507: -- [~pnowojski] TaskStateManagerImpl would clean up local checkpoint on notifying the aborted checkpoint (see https://github.com/apache/flink/blob/42556522637bf7b6f54809afec08fa0899f3e8fd/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java#L239). > Activate local recovery for RocksDB backends by default > --- > > Key: FLINK-15507 > URL: https://issues.apache.org/jira/browse/FLINK-15507 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Stephan Ewen >Assignee: Yuan Mei >Priority: Major > Labels: auto-deprioritized-critical, auto-unassigned, > pull-request-available > > For the RocksDB state backend, local recovery has no overhead when > incremental checkpoints are used. > It should be activated by default, because it greatly helps with recovery. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25200) Implement duplicating for s3 filesystem
[ https://issues.apache.org/jira/browse/FLINK-25200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17481048#comment-17481048 ] Yun Tang commented on FLINK-25200: -- [~pnowojski] Actually, I mean we should implement Aliyun OSS duplication first since OSS could achieve much better performance improvement. These two tickets are certainlly independent. > Implement duplicating for s3 filesystem > --- > > Key: FLINK-25200 > URL: https://issues.apache.org/jira/browse/FLINK-25200 > Project: Flink > Issue Type: Sub-task > Components: FileSystems >Reporter: Dawid Wysakowicz >Priority: Major > Fix For: 1.15.0 > > > We can use https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25200) Implement duplicating for s3 filesystem
[ https://issues.apache.org/jira/browse/FLINK-25200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17480904#comment-17480904 ] Yun Tang commented on FLINK-25200: -- [~pnowojski] From my knowledge, S3 will involve copy on server side if calling #copyObject, which cannot be really fast. And OSS would use [shallow copy|https://issues.apache.org/jira/browse/HADOOP-15323] for normal foramt file under 32MB or multipat format file, which could finish the copying within hundreds of milliseconds (can achive 10 millisecond at least), which deserves to be implemented first. > Implement duplicating for s3 filesystem > --- > > Key: FLINK-25200 > URL: https://issues.apache.org/jira/browse/FLINK-25200 > Project: Flink > Issue Type: Sub-task > Components: FileSystems >Reporter: Dawid Wysakowicz >Priority: Major > Fix For: 1.15.0 > > > We can use https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Closed] (FLINK-22578) deployment configuration page miss content navigate menu
[ https://issues.apache.org/jira/browse/FLINK-22578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang closed FLINK-22578. Fix Version/s: (was: 1.15.0) (was: 1.13.6) Resolution: Implemented > deployment configuration page miss content navigate menu > > > Key: FLINK-22578 > URL: https://issues.apache.org/jira/browse/FLINK-22578 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.13.0 >Reporter: 谢波 >Priority: Minor > Labels: auto-deprioritized-major > > [https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/] > The page miss content navigate menu cuase Inconvenient to use. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Reopened] (FLINK-22578) deployment configuration page miss content navigate menu
[ https://issues.apache.org/jira/browse/FLINK-22578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang reopened FLINK-22578: -- > deployment configuration page miss content navigate menu > > > Key: FLINK-22578 > URL: https://issues.apache.org/jira/browse/FLINK-22578 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.13.0 >Reporter: 谢波 >Priority: Minor > Labels: auto-deprioritized-major > Fix For: 1.15.0, 1.13.6 > > > [https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/] > The page miss content navigate menu cuase Inconvenient to use. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25767) Translation of page 'Working with State' is incomplete
[ https://issues.apache.org/jira/browse/FLINK-25767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17480830#comment-17480830 ] Yun Tang commented on FLINK-25767: -- [~paul8263] already assigned to you, please go ahead. > Translation of page 'Working with State' is incomplete > -- > > Key: FLINK-25767 > URL: https://issues.apache.org/jira/browse/FLINK-25767 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.14.3 >Reporter: Yao Zhang >Assignee: Yao Zhang >Priority: Major > Fix For: 1.15.0 > > > The translation of page [Working with State | Apache > Flink|https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/datastream/fault-tolerance/state/] > is incomplete. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Assigned] (FLINK-25767) Translation of page 'Working with State' is incomplete
[ https://issues.apache.org/jira/browse/FLINK-25767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang reassigned FLINK-25767: Assignee: Yao Zhang > Translation of page 'Working with State' is incomplete > -- > > Key: FLINK-25767 > URL: https://issues.apache.org/jira/browse/FLINK-25767 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.14.3 >Reporter: Yao Zhang >Assignee: Yao Zhang >Priority: Major > Fix For: 1.15.0 > > > The translation of page [Working with State | Apache > Flink|https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/datastream/fault-tolerance/state/] > is incomplete. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Assigned] (FLINK-15507) Activate local recovery for RocksDB backends by default
[ https://issues.apache.org/jira/browse/FLINK-15507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang reassigned FLINK-15507: Assignee: Yuan Mei (was: Yuan Mei) > Activate local recovery for RocksDB backends by default > --- > > Key: FLINK-15507 > URL: https://issues.apache.org/jira/browse/FLINK-15507 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Stephan Ewen >Assignee: Yuan Mei >Priority: Major > Labels: auto-deprioritized-critical, auto-unassigned, > pull-request-available > > For the RocksDB state backend, local recovery has no overhead when > incremental checkpoints are used. > It should be activated by default, because it greatly helps with recovery. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Assigned] (FLINK-15507) Activate local recovery for RocksDB backends by default
[ https://issues.apache.org/jira/browse/FLINK-15507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang reassigned FLINK-15507: Assignee: Yuan Mei > Activate local recovery for RocksDB backends by default > --- > > Key: FLINK-15507 > URL: https://issues.apache.org/jira/browse/FLINK-15507 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Stephan Ewen >Assignee: Yuan Mei >Priority: Major > Labels: auto-deprioritized-critical, auto-unassigned, > pull-request-available > > For the RocksDB state backend, local recovery has no overhead when > incremental checkpoints are used. > It should be activated by default, because it greatly helps with recovery. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-15507) Activate local recovery for RocksDB backends by default
[ https://issues.apache.org/jira/browse/FLINK-15507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang updated FLINK-15507: - Fix Version/s: (was: 1.15.0) > Activate local recovery for RocksDB backends by default > --- > > Key: FLINK-15507 > URL: https://issues.apache.org/jira/browse/FLINK-15507 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Stephan Ewen >Priority: Major > Labels: auto-deprioritized-critical, auto-unassigned, > pull-request-available > > For the RocksDB state backend, local recovery has no overhead when > incremental checkpoints are used. > It should be activated by default, because it greatly helps with recovery. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25200) Implement duplicating for s3 filesystem
[ https://issues.apache.org/jira/browse/FLINK-25200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17479139#comment-17479139 ] Yun Tang commented on FLINK-25200: -- [~akalashnikov] I think we should run tests to mock the condition how Flink will use DFS for checkpoint. That's why I think we should read local existng files and upload them to S3. And certainlly, we should test it on AWS mcahines instead of on local machine. > Implement duplicating for s3 filesystem > --- > > Key: FLINK-25200 > URL: https://issues.apache.org/jira/browse/FLINK-25200 > Project: Flink > Issue Type: Sub-task > Components: FileSystems >Reporter: Dawid Wysakowicz >Priority: Major > Fix For: 1.15.0 > > > We can use https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-24210) Window related serailizer should not return 0 as its serialized length
[ https://issues.apache.org/jira/browse/FLINK-24210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang updated FLINK-24210: - Fix Version/s: 1.15.0 > Window related serailizer should not return 0 as its serialized length > -- > > Key: FLINK-24210 > URL: https://issues.apache.org/jira/browse/FLINK-24210 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System, Runtime / State Backends >Reporter: Yun Tang >Assignee: Jinzhong Li >Priority: Major > Labels: pull-request-available, stale-assigned > Fix For: 1.15.0 > > > TimeWindow serializer return 0 as its length for serialization, this is > certatinately not correct. > {code:java} > public static class Serializer extends TypeSerializerSingleton { > > @Override > public int getLength() { > return 0; > } > } > {code} > Current namespace serializer in state backend does not depends on this > interface so that no obvious bug has ever reported. > Moreover, this bug also occurs in other window related serializer. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (FLINK-24210) Window related serailizer should not return 0 as its serialized length
[ https://issues.apache.org/jira/browse/FLINK-24210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang resolved FLINK-24210. -- Resolution: Fixed Merged in master: 573d2b038f8fd5dc1177733ee3087c5b6c847fa4 > Window related serailizer should not return 0 as its serialized length > -- > > Key: FLINK-24210 > URL: https://issues.apache.org/jira/browse/FLINK-24210 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System, Runtime / State Backends >Reporter: Yun Tang >Assignee: Jinzhong Li >Priority: Major > Labels: pull-request-available, stale-assigned > > TimeWindow serializer return 0 as its length for serialization, this is > certatinately not correct. > {code:java} > public static class Serializer extends TypeSerializerSingleton { > > @Override > public int getLength() { > return 0; > } > } > {code} > Current namespace serializer in state backend does not depends on this > interface so that no obvious bug has ever reported. > Moreover, this bug also occurs in other window related serializer. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25200) Implement duplicating for s3 filesystem
[ https://issues.apache.org/jira/browse/FLINK-25200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17478674#comment-17478674 ] Yun Tang commented on FLINK-25200: -- For general RocksDB incremental checkpoint, we should also test uploading 64MB files V.S copying 64MB file as RocksDB has the default 64MB SST files. Moreoever, I noticed that the write throughput can be reached to 100MB/s, from my understanding, this behavior looks really good on cloud environment. Do you ensure that the test logic is reading local files and then write to remote S3 instead of directly write to remote S3? > Implement duplicating for s3 filesystem > --- > > Key: FLINK-25200 > URL: https://issues.apache.org/jira/browse/FLINK-25200 > Project: Flink > Issue Type: Sub-task > Components: FileSystems >Reporter: Dawid Wysakowicz >Priority: Major > Fix For: 1.15.0 > > > We can use https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Closed] (FLINK-25494) Duplicate element serializer during DefaultOperatorStateBackendSnapshotStrategy#syncPrepareResources
[ https://issues.apache.org/jira/browse/FLINK-25494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang closed FLINK-25494. Resolution: Invalid > Duplicate element serializer during > DefaultOperatorStateBackendSnapshotStrategy#syncPrepareResources > > > Key: FLINK-25494 > URL: https://issues.apache.org/jira/browse/FLINK-25494 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0, 1.13.6, 1.14.4 > > > Currently, during > DefaultOperatorStateBackendSnapshotStrategy#syncPrepareResources, it will > copy the array list serializer via PartitionableListState#deepCopy. However, > it just initialize another ArrayListSerializer and not duplicate the internal > state serializer: > > See "{{{}internalListCopySerializer{}}}": > > {code:java} > private PartitionableListState( > RegisteredOperatorStateBackendMetaInfo stateMetaInfo, ArrayList > internalList) { > this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo); > this.internalList = Preconditions.checkNotNull(internalList); > this.internalListCopySerializer = > new > ArrayListSerializer<>(stateMetaInfo.getPartitionStateSerializer()); > } {code} > > This would cause unexpected problem with the usage of kryo serializer. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Assigned] (FLINK-25580) Minor improvements for RocksDBIncrementalCheckpointUtils
[ https://issues.apache.org/jira/browse/FLINK-25580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang reassigned FLINK-25580: Assignee: Aitozi > Minor improvements for RocksDBIncrementalCheckpointUtils > > > Key: FLINK-25580 > URL: https://issues.apache.org/jira/browse/FLINK-25580 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Aitozi >Assignee: Aitozi >Priority: Minor > Labels: pull-request-available > Fix For: 1.15.0 > > > {{RocksDBIncrementalCheckpointUtils}} use > {{intersectGroup.getNumberOfKeyGroups() * overlapFraction * overlapFraction}} > to evaluate the score of the state handle. It's meaning is not so explicit. > And the rule to choose the handle should be > 1. Choose the bigger intersect group range first > 2. If the intersect group range are same, we use the handle with higher > overlap fraction, which means less iterator delete operation. > After we introduce the {{deleteRange}} api, this rule can still work. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25580) Minor improvements for RocksDBIncrementalCheckpointUtils
[ https://issues.apache.org/jira/browse/FLINK-25580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang updated FLINK-25580: - Fix Version/s: 1.15.0 > Minor improvements for RocksDBIncrementalCheckpointUtils > > > Key: FLINK-25580 > URL: https://issues.apache.org/jira/browse/FLINK-25580 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Aitozi >Priority: Minor > Labels: pull-request-available > Fix For: 1.15.0 > > > {{RocksDBIncrementalCheckpointUtils}} use > {{intersectGroup.getNumberOfKeyGroups() * overlapFraction * overlapFraction}} > to evaluate the score of the state handle. It's meaning is not so explicit. > And the rule to choose the handle should be > 1. Choose the bigger intersect group range first > 2. If the intersect group range are same, we use the handle with higher > overlap fraction, which means less iterator delete operation. > After we introduce the {{deleteRange}} api, this rule can still work. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (FLINK-25580) Minor improvements for RocksDBIncrementalCheckpointUtils
[ https://issues.apache.org/jira/browse/FLINK-25580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang resolved FLINK-25580. -- Resolution: Fixed Megred in master: b7fd63b41500dbfeadbf2ee48e1a83d77cd1259b > Minor improvements for RocksDBIncrementalCheckpointUtils > > > Key: FLINK-25580 > URL: https://issues.apache.org/jira/browse/FLINK-25580 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Aitozi >Priority: Minor > Labels: pull-request-available > > {{RocksDBIncrementalCheckpointUtils}} use > {{intersectGroup.getNumberOfKeyGroups() * overlapFraction * overlapFraction}} > to evaluate the score of the state handle. It's meaning is not so explicit. > And the rule to choose the handle should be > 1. Choose the bigger intersect group range first > 2. If the intersect group range are same, we use the handle with higher > overlap fraction, which means less iterator delete operation. > After we introduce the {{deleteRange}} api, this rule can still work. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Closed] (FLINK-24797) GlobalWindow.Serializer#getLength() provides a wrong length.
[ https://issues.apache.org/jira/browse/FLINK-24797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang closed FLINK-24797. Resolution: Information Provided [~bx123] Thanks for your effect to finding this problem. However, this problem is known and duplicated with FLINK-24210. > GlobalWindow.Serializer#getLength() provides a wrong length. > > > Key: FLINK-24797 > URL: https://issues.apache.org/jira/browse/FLINK-24797 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System >Affects Versions: 1.12.0, 1.13.0, 1.14.0 >Reporter: bx123 >Priority: Minor > > The returned value in current version is 0, while the > serialize() method indeed writes one byte and the correct value seems to be 1. > > I am wondering whether there may exist any Serializer that will return length > 0? -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-23378) ContinuousProcessingTimeTrigger最后一个定时器无法触发
[ https://issues.apache.org/jira/browse/FLINK-23378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang updated FLINK-23378: - Fix Version/s: (was: 1.14.3) > ContinuousProcessingTimeTrigger最后一个定时器无法触发 > -- > > Key: FLINK-23378 > URL: https://issues.apache.org/jira/browse/FLINK-23378 > Project: Flink > Issue Type: Bug >Affects Versions: 1.12.3 >Reporter: frey >Priority: Major > Labels: pull-request-available > > 使用滚动窗口,时间语义为ProcessingTime时,修改默认触发器为ContinuousProcessingTimeTrigger后,最后一个定时器时间等于下一个窗口的起始时间,所以无法触发最后一个定时器的计算 > 可修改onProcessingTime中time=window.maxTimestamp()时FIRE -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-23378) ContinuousProcessingTimeTrigger最后一个定时器无法触发
[ https://issues.apache.org/jira/browse/FLINK-23378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17474365#comment-17474365 ] Yun Tang commented on FLINK-23378: -- [~frey] Please make the title and description of this ticket is in English. I see the PR has not been merged, why this ticket has been closed as "Fixed"? I will reopen this ticket and remove the fix version field. > ContinuousProcessingTimeTrigger最后一个定时器无法触发 > -- > > Key: FLINK-23378 > URL: https://issues.apache.org/jira/browse/FLINK-23378 > Project: Flink > Issue Type: Bug >Affects Versions: 1.12.3 >Reporter: frey >Priority: Major > Labels: pull-request-available > Fix For: 1.14.3 > > > 使用滚动窗口,时间语义为ProcessingTime时,修改默认触发器为ContinuousProcessingTimeTrigger后,最后一个定时器时间等于下一个窗口的起始时间,所以无法触发最后一个定时器的计算 > 可修改onProcessingTime中time=window.maxTimestamp()时FIRE -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Reopened] (FLINK-23378) ContinuousProcessingTimeTrigger最后一个定时器无法触发
[ https://issues.apache.org/jira/browse/FLINK-23378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang reopened FLINK-23378: -- > ContinuousProcessingTimeTrigger最后一个定时器无法触发 > -- > > Key: FLINK-23378 > URL: https://issues.apache.org/jira/browse/FLINK-23378 > Project: Flink > Issue Type: Bug >Affects Versions: 1.12.3 >Reporter: frey >Priority: Major > Labels: pull-request-available > Fix For: 1.14.3 > > > 使用滚动窗口,时间语义为ProcessingTime时,修改默认触发器为ContinuousProcessingTimeTrigger后,最后一个定时器时间等于下一个窗口的起始时间,所以无法触发最后一个定时器的计算 > 可修改onProcessingTime中time=window.maxTimestamp()时FIRE -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (FLINK-25601) Update 'state.backend' in flink-conf.yaml
[ https://issues.apache.org/jira/browse/FLINK-25601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang resolved FLINK-25601. -- Resolution: Fixed merged in master: 41e96d2488e4caba17eef5172bb5ae493eb9c742 > Update 'state.backend' in flink-conf.yaml > - > > Key: FLINK-25601 > URL: https://issues.apache.org/jira/browse/FLINK-25601 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.14.2 >Reporter: Ada Wong >Assignee: Ada Wong >Priority: Major > Labels: pull-request-available > > The value and comments of 'state.backend' in flink-conf.yaml is deprecated. > {code:java} > # Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the > # . > # > # state.backend: filesystem{code} > We should update to this following. > > {code:java} > # Supported backends are 'hashmap', 'rocksdb', or the > # . > # > # state.backend: hashmap {code} > > > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Assigned] (FLINK-25478) Same materialized state handle should not register multi times
[ https://issues.apache.org/jira/browse/FLINK-25478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang reassigned FLINK-25478: Assignee: Yun Tang > Same materialized state handle should not register multi times > -- > > Key: FLINK-25478 > URL: https://issues.apache.org/jira/browse/FLINK-25478 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Checkpointing, Runtime / State Backends >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Critical > Fix For: 1.15.0 > > > Currently, changelog materialization would call RocksDB state backend's > snapshot method to generate {{IncrementalRemoteKeyedStateHandle}} as > ChangelogStateBackendHandleImpl's materialized artifacts. And before next > materialization, it will always report the same > {{IncrementalRemoteKeyedStateHandle}} as before. > It's fine to register this for the 1st time. However, for the 2nd time to > register {{IncrementalRemoteKeyedStateHandle}} (via > {{{}ChangelogStateBackendHandleImpl#registerSharedStates{}}}), it will > discard the private state artifacts without check the register reference: > IncrementalRemoteKeyedStateHandle: > {code:java} > public void discardState() throws Exception { > try { > StateUtil.bestEffortDiscardAllStateObjects(privateState.values()); > } catch (Exception e) { > LOG.warn("Could not properly discard misc file states.", e); > } > } > {code} > Thus, this would delete the private state (such as RocksDB's MAINFEST), and > once restore, job would not report FileNotFoundException. > > {code:java} > Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught > unexpected exception. > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:403) > at > org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:477) > > at > org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:90) > > at > org.apache.flink.state.changelog.ChangelogStateBackend.lambda$createKeyedStateBackend$1(ChangelogStateBackend.java:153) > at > org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:68) > at > org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:221) > > at > org.apache.flink.state.changelog.ChangelogStateBackend.createKeyedStateBackend(ChangelogStateBackend.java:145) > > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) > > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) > > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) > > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) > > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) > > ... 10 more > Caused by: java.io.FileNotFoundException: x > at > org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(PluginFileSystemFactory.java:127) > > at > org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68) > > at > org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:127) > > at > org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:110) > at > org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49) > > at > java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626) > ~[?:1.8.0_102] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147) > ~[?:1.8.0_102] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622) > ~[?:1.8.0_102] > ... 1 more {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25601) Update 'state.backend' in flink-conf.yaml
[ https://issues.apache.org/jira/browse/FLINK-25601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17472503#comment-17472503 ] Yun Tang commented on FLINK-25601: -- [~ana4] Already assigned to you, please go ahead. > Update 'state.backend' in flink-conf.yaml > - > > Key: FLINK-25601 > URL: https://issues.apache.org/jira/browse/FLINK-25601 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.14.2 >Reporter: Ada Wong >Assignee: Ada Wong >Priority: Major > > The value and comments of 'state.backend' in flink-conf.yaml is deprecated. > {code:java} > # Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the > # . > # > # state.backend: filesystem{code} > We should update to this following. > > {code:java} > # Supported backends are 'hashmap', 'rocksdb', or the > # . > # > # state.backend: hashmap {code} > > > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Assigned] (FLINK-25601) Update 'state.backend' in flink-conf.yaml
[ https://issues.apache.org/jira/browse/FLINK-25601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang reassigned FLINK-25601: Assignee: Ada Wong > Update 'state.backend' in flink-conf.yaml > - > > Key: FLINK-25601 > URL: https://issues.apache.org/jira/browse/FLINK-25601 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.14.2 >Reporter: Ada Wong >Assignee: Ada Wong >Priority: Major > > The value and comments of 'state.backend' in flink-conf.yaml is deprecated. > {code:java} > # Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the > # . > # > # state.backend: filesystem{code} > We should update to this following. > > {code:java} > # Supported backends are 'hashmap', 'rocksdb', or the > # . > # > # state.backend: hashmap {code} > > > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25557) Introduce incremental/full checkpoint size stats
[ https://issues.apache.org/jira/browse/FLINK-25557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17471713#comment-17471713 ] Yun Tang commented on FLINK-25557: -- [~liufangqi] Thanks for your attention. I will introduce the new attributes of "incremental checkpoint size" and "full checkpoint size" to replace original "checkpointed state size" to show on web UI and in rest APIs. This part of code had been done in my private flink and I think you could help to review the PR to enhance the community together. > Introduce incremental/full checkpoint size stats > > > Key: FLINK-25557 > URL: https://issues.apache.org/jira/browse/FLINK-25557 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing, Runtime / Metrics, Runtime / > REST, Runtime / State Backends >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Major > > Currently, the "checkpointed data size" would be incremental checkpoint size > if incremental checkpoint is enabled, otherwise full checkpoint size. This is > not friendly for users to know the exact incremental/full checkpoint size. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (FLINK-25536) Minor Fix: Adjust the order of variable declaration and comment in StateAssignmentOperation
[ https://issues.apache.org/jira/browse/FLINK-25536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang resolved FLINK-25536. -- Fix Version/s: 1.15.0 Resolution: Fixed merged in master: 34720dc4c883b70114668fdcb9d7d1190edfbbf9 > Minor Fix: Adjust the order of variable declaration and comment in > StateAssignmentOperation > --- > > Key: FLINK-25536 > URL: https://issues.apache.org/jira/browse/FLINK-25536 > Project: Flink > Issue Type: Improvement >Reporter: Junfan Zhang >Assignee: Junfan Zhang >Priority: Minor > Labels: pull-request-available > Fix For: 1.15.0 > > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25360) Add State Desc to CheckpointMetadata
[ https://issues.apache.org/jira/browse/FLINK-25360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17470355#comment-17470355 ] Yun Tang commented on FLINK-25360: -- [~liufangqi] I suggest to create a design doc via google doc first. And after several rounds of review, let's create a FLIP and launch a discussion then. > Add State Desc to CheckpointMetadata > > > Key: FLINK-25360 > URL: https://issues.apache.org/jira/browse/FLINK-25360 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing, Runtime / State Backends >Reporter: 刘方奇 >Priority: Major > Attachments: image-2021-12-17-20-01-42-423.png > > > Now we can't get the State Descriptor info in the checkpoint meta. Like the > case if we use state-processor-api to load state then rewrite state, we can't > flexible use the state. > Maybe there are other cases we need the State Descriptor, so can we add this > info? -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25557) Introduce incremental/full checkpoint size stats
Yun Tang created FLINK-25557: Summary: Introduce incremental/full checkpoint size stats Key: FLINK-25557 URL: https://issues.apache.org/jira/browse/FLINK-25557 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing, Runtime / Metrics, Runtime / REST, Runtime / State Backends Reporter: Yun Tang Assignee: Yun Tang Currently, the "checkpointed data size" would be incremental checkpoint size if incremental checkpoint is enabled, otherwise full checkpoint size. This is not friendly for users to know the exact incremental/full checkpoint size. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Assigned] (FLINK-25536) Minor Fix: Adjust the order of variable declaration and comment in StateAssignmentOperation
[ https://issues.apache.org/jira/browse/FLINK-25536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang reassigned FLINK-25536: Assignee: Junfan Zhang > Minor Fix: Adjust the order of variable declaration and comment in > StateAssignmentOperation > --- > > Key: FLINK-25536 > URL: https://issues.apache.org/jira/browse/FLINK-25536 > Project: Flink > Issue Type: Improvement >Reporter: Junfan Zhang >Assignee: Junfan Zhang >Priority: Minor > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25536) Minor Fix: Adjust the order of variable declaration and comment in StateAssignmentOperation
[ https://issues.apache.org/jira/browse/FLINK-25536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17469694#comment-17469694 ] Yun Tang commented on FLINK-25536: -- [~zuston] I think the PR is easy for review. Please remember to fill the description. > Minor Fix: Adjust the order of variable declaration and comment in > StateAssignmentOperation > --- > > Key: FLINK-25536 > URL: https://issues.apache.org/jira/browse/FLINK-25536 > Project: Flink > Issue Type: Improvement >Reporter: Junfan Zhang >Priority: Minor > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25360) Add State Desc to CheckpointMetadata
[ https://issues.apache.org/jira/browse/FLINK-25360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17469693#comment-17469693 ] Yun Tang commented on FLINK-25360: -- [~liufangqi] I think we could at least to write a discuss email for persisting addition information in state descriptor. For the 1st problem, I think you could at least provide a proposal for how to persisting the state meta data in checkpoint meta. From my mind, this could require to add some additional interfaces. > Add State Desc to CheckpointMetadata > > > Key: FLINK-25360 > URL: https://issues.apache.org/jira/browse/FLINK-25360 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing, Runtime / State Backends >Reporter: 刘方奇 >Priority: Major > Attachments: image-2021-12-17-20-01-42-423.png > > > Now we can't get the State Descriptor info in the checkpoint meta. Like the > case if we use state-processor-api to load state then rewrite state, we can't > flexible use the state. > Maybe there are other cases we need the State Descriptor, so can we add this > info? -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25458) Support local recovery
[ https://issues.apache.org/jira/browse/FLINK-25458?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang updated FLINK-25458: - Description: Current changelog state-backend implementation cannot work well with local recovery enabled. Current period materialization would call state backend snapshot method with a materialization id. However, current local state managment would rely on checkpoint id as storing, confirming and discarding. The gap between them would break how local recovery works. was:Current changelog state-backend implementation cannot work well with local recovery enabled. > Support local recovery > -- > > Key: FLINK-25458 > URL: https://issues.apache.org/jira/browse/FLINK-25458 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Checkpointing, Runtime / State Backends >Reporter: Yun Tang >Priority: Major > > Current changelog state-backend implementation cannot work well with local > recovery enabled. > Current period materialization would call state backend snapshot method with > a materialization id. However, current local state managment would rely on > checkpoint id as storing, confirming and discarding. The gap between them > would break how local recovery works. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25524) If enabled changelog, RocksDB incremental checkpoint would always be full
Yun Tang created FLINK-25524: Summary: If enabled changelog, RocksDB incremental checkpoint would always be full Key: FLINK-25524 URL: https://issues.apache.org/jira/browse/FLINK-25524 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing, Runtime / State Backends Reporter: Yun Tang Once changelog is enabled, RocksDB incremental checkpoint would only be executed during materialization. During this phase, it will leverage the {{materization id}} as the checkpoint id for RocksDB state backend's snapshot method. However, current incremental checkpoint mechanism heavily depends on the checkpoint id. And {{SortedMap> uploadedStateIDs}} with checkpoint id as the key within {{RocksIncrementalSnapshotStrategy}} is the kernel for incremental checkpoint. Once we notify checkpoint complete of previous checkpoint, it will then remove the uploaded stateIds of that checkpoint, leading to we cannot get proper checkpoint information on the next RocksDBKeyedStateBackend#snapshot. That is to say, we will always upload all RocksDB artifacts. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25478) Same materialized state handle should not register multi times
[ https://issues.apache.org/jira/browse/FLINK-25478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17468426#comment-17468426 ] Yun Tang commented on FLINK-25478: -- I created a branch to reproce this problem [https://github.com/Myasuka/flink/tree/FLINK-25478-test] , you can use {{ResumeCheckpointManuallyITCase.java}} to reproduce this. > Same materialized state handle should not register multi times > -- > > Key: FLINK-25478 > URL: https://issues.apache.org/jira/browse/FLINK-25478 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Checkpointing, Runtime / State Backends >Reporter: Yun Tang >Priority: Critical > Fix For: 1.15.0 > > > Currently, changelog materialization would call RocksDB state backend's > snapshot method to generate {{IncrementalRemoteKeyedStateHandle}} as > ChangelogStateBackendHandleImpl's materialized artifacts. And before next > materialization, it will always report the same > {{IncrementalRemoteKeyedStateHandle}} as before. > It's fine to register this for the 1st time. However, for the 2nd time to > register {{IncrementalRemoteKeyedStateHandle}} (via > {{{}ChangelogStateBackendHandleImpl#registerSharedStates{}}}), it will > discard the private state artifacts without check the register reference: > IncrementalRemoteKeyedStateHandle: > {code:java} > public void discardState() throws Exception { > try { > StateUtil.bestEffortDiscardAllStateObjects(privateState.values()); > } catch (Exception e) { > LOG.warn("Could not properly discard misc file states.", e); > } > } > {code} > Thus, this would delete the private state (such as RocksDB's MAINFEST), and > once restore, job would not report FileNotFoundException. > > {code:java} > Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught > unexpected exception. > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:403) > at > org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:477) > > at > org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:90) > > at > org.apache.flink.state.changelog.ChangelogStateBackend.lambda$createKeyedStateBackend$1(ChangelogStateBackend.java:153) > at > org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:68) > at > org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:221) > > at > org.apache.flink.state.changelog.ChangelogStateBackend.createKeyedStateBackend(ChangelogStateBackend.java:145) > > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) > > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) > > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) > > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) > > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) > > ... 10 more > Caused by: java.io.FileNotFoundException: x > at > org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(PluginFileSystemFactory.java:127) > > at > org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68) > > at > org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:127) > > at > org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:110) > at > org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49) > > at > java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626) > ~[?:1.8.0_102] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147) > ~[?:1.8.0_102] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622) > ~[?:1.8.0_102] > ... 1 more {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25477) The directory structure of the State Backends document is not standardized
[ https://issues.apache.org/jira/browse/FLINK-25477?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang updated FLINK-25477: - Fix Version/s: 1.14.3 > The directory structure of the State Backends document is not standardized > -- > > Key: FLINK-25477 > URL: https://issues.apache.org/jira/browse/FLINK-25477 > Project: Flink > Issue Type: Bug > Components: Documentation, Runtime / State Backends >Reporter: Hangxiang Yu >Assignee: Hangxiang Yu >Priority: Minor > Labels: pull-request-available > Fix For: 1.15.0, 1.14.3 > > Attachments: image-2021-12-29-16-56-24-657.png > > > The State Backends document uses multiple first-level headings. > It may cause the directory structure displayed incorrectly. > Just as the picture shows, the two titles are not in the table of contents on > the right. > > !image-2021-12-29-16-56-24-657.png|width=522,height=230! -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Closed] (FLINK-25477) The directory structure of the State Backends document is not standardized
[ https://issues.apache.org/jira/browse/FLINK-25477?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang closed FLINK-25477. Merged in master: 332eee0a0d6edb9c9aa455d182f79977496af756 release-1.14: 7263d977150c7d7fb880a637e2d65996cca7ea3a > The directory structure of the State Backends document is not standardized > -- > > Key: FLINK-25477 > URL: https://issues.apache.org/jira/browse/FLINK-25477 > Project: Flink > Issue Type: Bug > Components: Documentation, Runtime / State Backends >Reporter: Hangxiang Yu >Assignee: Hangxiang Yu >Priority: Minor > Labels: pull-request-available > Fix For: 1.15.0 > > Attachments: image-2021-12-29-16-56-24-657.png > > > The State Backends document uses multiple first-level headings. > It may cause the directory structure displayed incorrectly. > Just as the picture shows, the two titles are not in the table of contents on > the right. > > !image-2021-12-29-16-56-24-657.png|width=522,height=230! -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25429) Avoid to close output streams twice during uploading changelogs
[ https://issues.apache.org/jira/browse/FLINK-25429?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang updated FLINK-25429: - Issue Type: Bug (was: Improvement) > Avoid to close output streams twice during uploading changelogs > --- > > Key: FLINK-25429 > URL: https://issues.apache.org/jira/browse/FLINK-25429 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > Current uploader implementation would close {{stream}} and {{fsStream}} one > by one, which lead to {{fsStream}} closed twice. > {code:java} > try (FSDataOutputStream fsStream = fileSystem.create(path, > NO_OVERWRITE)) { > fsStream.write(compression ? 1 : 0); > try (OutputStreamWithPos stream = wrap(fsStream); ) { > final Map> tasksOffsets > = new HashMap<>(); > for (UploadTask task : tasks) { > tasksOffsets.put(task, format.write(stream, > task.changeSets)); > } > FileStateHandle handle = new FileStateHandle(path, > stream.getPos()); > // WARN: streams have to be closed before returning the > results > // otherwise JM may receive invalid handles > return new LocalResult(tasksOffsets, handle); > } > } > {code} > Not all file system supports to close same stream twice. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25494) Duplicate element serializer during DefaultOperatorStateBackendSnapshotStrategy#syncPrepareResources
[ https://issues.apache.org/jira/browse/FLINK-25494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang updated FLINK-25494: - Description: Currently, during DefaultOperatorStateBackendSnapshotStrategy#syncPrepareResources, it will copy the array list serializer via PartitionableListState#deepCopy. However, it just initialize another ArrayListSerializer and not duplicate the internal state serializer: See "{{{}internalListCopySerializer{}}}": {code:java} private PartitionableListState( RegisteredOperatorStateBackendMetaInfo stateMetaInfo, ArrayList internalList) { this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo); this.internalList = Preconditions.checkNotNull(internalList); this.internalListCopySerializer = new ArrayListSerializer<>(stateMetaInfo.getPartitionStateSerializer()); } {code} This would cause unexpected problem with the usage of kryo serializer. was: Currently, during DefaultOperatorStateBackendSnapshotStrategy#syncPrepareResources, it will copy the array list serializer via PartitionableListState#deepCopy. However, it just initialize another ArrayListSerializer and not duplicate the internal state serializer: See "{{{}internalListCopySerializer{}}}": {code:java} private PartitionableListState( RegisteredOperatorStateBackendMetaInfo stateMetaInfo, ArrayList internalList) { this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo); this.internalList = Preconditions.checkNotNull(internalList); this.internalListCopySerializer = new ArrayListSerializer<>(stateMetaInfo.getPartitionStateSerializer()); } {code} This would cause unexpected problem with usage of kryo serializer. > Duplicate element serializer during > DefaultOperatorStateBackendSnapshotStrategy#syncPrepareResources > > > Key: FLINK-25494 > URL: https://issues.apache.org/jira/browse/FLINK-25494 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Major > Fix For: 1.15.0, 1.13.6, 1.14.3 > > > Currently, during > DefaultOperatorStateBackendSnapshotStrategy#syncPrepareResources, it will > copy the array list serializer via PartitionableListState#deepCopy. However, > it just initialize another ArrayListSerializer and not duplicate the internal > state serializer: > > See "{{{}internalListCopySerializer{}}}": > > {code:java} > private PartitionableListState( > RegisteredOperatorStateBackendMetaInfo stateMetaInfo, ArrayList > internalList) { > this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo); > this.internalList = Preconditions.checkNotNull(internalList); > this.internalListCopySerializer = > new > ArrayListSerializer<>(stateMetaInfo.getPartitionStateSerializer()); > } {code} > > This would cause unexpected problem with the usage of kryo serializer. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25494) Duplicate element serializer during DefaultOperatorStateBackendSnapshotStrategy#syncPrepareResources
[ https://issues.apache.org/jira/browse/FLINK-25494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang updated FLINK-25494: - Description: Currently, during DefaultOperatorStateBackendSnapshotStrategy#syncPrepareResources, it will copy the array list serializer via PartitionableListState#deepCopy. However, it just initialize another ArrayListSerializer and not duplicate the internal state serializer: See "{{{}internalListCopySerializer{}}}": {code:java} private PartitionableListState( RegisteredOperatorStateBackendMetaInfo stateMetaInfo, ArrayList internalList) { this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo); this.internalList = Preconditions.checkNotNull(internalList); this.internalListCopySerializer = new ArrayListSerializer<>(stateMetaInfo.getPartitionStateSerializer()); } {code} This would cause unexpected problem with usage of kryo serializer. was: Currently, during DefaultOperatorStateBackendSnapshotStrategy#syncPrepareResources, it will copy the array list serializer via PartitionableListState#deepCopy. However, it just initialize another ArrayListSerializer and not duplicate the internal state serializer: See "{{internalListCopySerializer}}": {code:java} private PartitionableListState( RegisteredOperatorStateBackendMetaInfo stateMetaInfo, ArrayList internalList) { this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo); this.internalList = Preconditions.checkNotNull(internalList); this.internalListCopySerializer = new ArrayListSerializer<>(stateMetaInfo.getPartitionStateSerializer()); } {code} > Duplicate element serializer during > DefaultOperatorStateBackendSnapshotStrategy#syncPrepareResources > > > Key: FLINK-25494 > URL: https://issues.apache.org/jira/browse/FLINK-25494 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Major > Fix For: 1.15.0, 1.13.6, 1.14.3 > > > Currently, during > DefaultOperatorStateBackendSnapshotStrategy#syncPrepareResources, it will > copy the array list serializer via PartitionableListState#deepCopy. However, > it just initialize another ArrayListSerializer and not duplicate the internal > state serializer: > > See "{{{}internalListCopySerializer{}}}": > > {code:java} > private PartitionableListState( > RegisteredOperatorStateBackendMetaInfo stateMetaInfo, ArrayList > internalList) { > this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo); > this.internalList = Preconditions.checkNotNull(internalList); > this.internalListCopySerializer = > new > ArrayListSerializer<>(stateMetaInfo.getPartitionStateSerializer()); > } {code} > > This would cause unexpected problem with usage of kryo serializer. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25494) Duplicate element serializer during DefaultOperatorStateBackendSnapshotStrategy#syncPrepareResources
Yun Tang created FLINK-25494: Summary: Duplicate element serializer during DefaultOperatorStateBackendSnapshotStrategy#syncPrepareResources Key: FLINK-25494 URL: https://issues.apache.org/jira/browse/FLINK-25494 Project: Flink Issue Type: Bug Components: Runtime / State Backends Reporter: Yun Tang Assignee: Yun Tang Fix For: 1.15.0, 1.13.6, 1.14.3 Currently, during DefaultOperatorStateBackendSnapshotStrategy#syncPrepareResources, it will copy the array list serializer via PartitionableListState#deepCopy. However, it just initialize another ArrayListSerializer and not duplicate the internal state serializer: See "{{internalListCopySerializer}}": {code:java} private PartitionableListState( RegisteredOperatorStateBackendMetaInfo stateMetaInfo, ArrayList internalList) { this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo); this.internalList = Preconditions.checkNotNull(internalList); this.internalListCopySerializer = new ArrayListSerializer<>(stateMetaInfo.getPartitionStateSerializer()); } {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Closed] (FLINK-10954) Hardlink from files of previous local stored state might cross devices
[ https://issues.apache.org/jira/browse/FLINK-10954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang closed FLINK-10954. Resolution: Information Provided After FLINK-25468, we can at least not fail the job. > Hardlink from files of previous local stored state might cross devices > -- > > Key: FLINK-10954 > URL: https://issues.apache.org/jira/browse/FLINK-10954 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.6.2 >Reporter: Yun Tang >Priority: Minor > Labels: auto-deprioritized-critical, auto-deprioritized-major, > auto-unassigned > > Currently, local recovery's base directories is initialized from > '{{io.tmp.dirs}}' if parameter '{{taskmanager.state.local.root-dirs}}' is not > set. For Yarn environment, the tmp dirs is replaced by its '{{LOCAL_DIRS}}', > which might consist of directories from different devices, such as > /dump/1/nm-local-dir, /dump/2/nm-local-dir. The local directory for RocksDB > is initialized from IOManager's spillingDirectories, which might located in > different device from local recovery's folder. However, hard-link between > different devices is not allowed, it will throw exception below: > {code:java} > java.nio.file.FileSystemException: target -> souce: Invalid cross-device link > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25478) Same materialized state handle should not register multi times
[ https://issues.apache.org/jira/browse/FLINK-25478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang updated FLINK-25478: - Summary: Same materialized state handle should not register multi times (was: Same materialized state handle should register multi times) > Same materialized state handle should not register multi times > -- > > Key: FLINK-25478 > URL: https://issues.apache.org/jira/browse/FLINK-25478 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Checkpointing, Runtime / State Backends >Reporter: Yun Tang >Priority: Critical > Fix For: 1.15.0 > > > Currently, changelog materialization would call RocksDB state backend's > snapshot method to generate {{IncrementalRemoteKeyedStateHandle}} as > ChangelogStateBackendHandleImpl's materialized artifacts. And before next > materialization, it will always report the same > {{IncrementalRemoteKeyedStateHandle}} as before. > It's fine to register this for the 1st time. However, for the 2nd time to > register {{IncrementalRemoteKeyedStateHandle}} (via > {{{}ChangelogStateBackendHandleImpl#registerSharedStates{}}}), it will > discard the private state artifacts without check the register reference: > IncrementalRemoteKeyedStateHandle: > {code:java} > public void discardState() throws Exception { > try { > StateUtil.bestEffortDiscardAllStateObjects(privateState.values()); > } catch (Exception e) { > LOG.warn("Could not properly discard misc file states.", e); > } > } > {code} > Thus, this would delete the private state (such as RocksDB's MAINFEST), and > once restore, job would not report FileNotFoundException. > > {code:java} > Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught > unexpected exception. > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:403) > at > org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:477) > > at > org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:90) > > at > org.apache.flink.state.changelog.ChangelogStateBackend.lambda$createKeyedStateBackend$1(ChangelogStateBackend.java:153) > at > org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:68) > at > org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:221) > > at > org.apache.flink.state.changelog.ChangelogStateBackend.createKeyedStateBackend(ChangelogStateBackend.java:145) > > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) > > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) > > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) > > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) > > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) > > ... 10 more > Caused by: java.io.FileNotFoundException: x > at > org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(PluginFileSystemFactory.java:127) > > at > org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68) > > at > org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:127) > > at > org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:110) > at > org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49) > > at > java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626) > ~[?:1.8.0_102] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147) > ~[?:1.8.0_102] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622) > ~[?:1.8.0_102] > ... 1 more {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25478) Same materialized state handle should register multi times
[ https://issues.apache.org/jira/browse/FLINK-25478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang updated FLINK-25478: - Parent: FLINK-21352 Issue Type: Sub-task (was: Bug) > Same materialized state handle should register multi times > -- > > Key: FLINK-25478 > URL: https://issues.apache.org/jira/browse/FLINK-25478 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Checkpointing, Runtime / State Backends >Reporter: Yun Tang >Priority: Critical > Fix For: 1.15.0 > > > Currently, changelog materialization would call RocksDB state backend's > snapshot method to generate {{IncrementalRemoteKeyedStateHandle}} as > ChangelogStateBackendHandleImpl's materialized artifacts. And before next > materialization, it will always report the same > {{IncrementalRemoteKeyedStateHandle}} as before. > It's fine to register this for the 1st time. However, for the 2nd time to > register {{IncrementalRemoteKeyedStateHandle}} (via > {{{}ChangelogStateBackendHandleImpl#registerSharedStates{}}}), it will > discard the private state artifacts without check the register reference: > IncrementalRemoteKeyedStateHandle: > {code:java} > public void discardState() throws Exception { > try { > StateUtil.bestEffortDiscardAllStateObjects(privateState.values()); > } catch (Exception e) { > LOG.warn("Could not properly discard misc file states.", e); > } > } > {code} > Thus, this would delete the private state (such as RocksDB's MAINFEST), and > once restore, job would not report FileNotFoundException. > > {code:java} > Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught > unexpected exception. > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:403) > at > org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:477) > > at > org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:90) > > at > org.apache.flink.state.changelog.ChangelogStateBackend.lambda$createKeyedStateBackend$1(ChangelogStateBackend.java:153) > at > org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:68) > at > org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:221) > > at > org.apache.flink.state.changelog.ChangelogStateBackend.createKeyedStateBackend(ChangelogStateBackend.java:145) > > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) > > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) > > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) > > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) > > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) > > ... 10 more > Caused by: java.io.FileNotFoundException: x > at > org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(PluginFileSystemFactory.java:127) > > at > org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68) > > at > org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:127) > > at > org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:110) > at > org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49) > > at > java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626) > ~[?:1.8.0_102] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147) > ~[?:1.8.0_102] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622) > ~[?:1.8.0_102] > ... 1 more {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25478) Same materialized state handle should register multi times
[ https://issues.apache.org/jira/browse/FLINK-25478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang updated FLINK-25478: - Description: Currently, changelog materialization would call RocksDB state backend's snapshot method to generate {{IncrementalRemoteKeyedStateHandle}} as ChangelogStateBackendHandleImpl's materialized artifacts. And before next materialization, it will always report the same {{IncrementalRemoteKeyedStateHandle}} as before. It's fine to register this for the 1st time. However, for the 2nd time to register {{IncrementalRemoteKeyedStateHandle}} (via {{{}ChangelogStateBackendHandleImpl#registerSharedStates{}}}), it will discard the private state artifacts without check the register reference: IncrementalRemoteKeyedStateHandle: {code:java} public void discardState() throws Exception { try { StateUtil.bestEffortDiscardAllStateObjects(privateState.values()); } catch (Exception e) { LOG.warn("Could not properly discard misc file states.", e); } } {code} Thus, this would delete the private state (such as RocksDB's MAINFEST), and once restore, job would not report FileNotFoundException. {code:java} Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception. at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:403) at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:477) at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:90) at org.apache.flink.state.changelog.ChangelogStateBackend.lambda$createKeyedStateBackend$1(ChangelogStateBackend.java:153) at org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:68) at org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:221) at org.apache.flink.state.changelog.ChangelogStateBackend.createKeyedStateBackend(ChangelogStateBackend.java:145) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) ... 10 more Caused by: java.io.FileNotFoundException: x at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(PluginFileSystemFactory.java:127) at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68) at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:127) at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:110) at org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49) at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626) ~[?:1.8.0_102] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147) ~[?:1.8.0_102] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622) ~[?:1.8.0_102] ... 1 more {code} was: Currently, changelog materialization would call RocksDB state backend's snapshot method to generate {{IncrementalRemoteKeyedStateHandle}} as ChangelogStateBackendHandleImpl's materialized artifacts. And before next materialization, it will always report the same {{IncrementalRemoteKeyedStateHandle}} as before. It's fine to register this for the 1st time. However, for the 2nd time to register {{IncrementalRemoteKeyedStateHandle}} (via {{{}ChangelogStateBackendHandleImpl#registerSharedStates{}}}), it will discard the private state artifacts without check the register reference: IncrementalRemoteKeyedStateHandle: {code:java} public void discardState() throws Exception { try { StateUtil.bestEffortDiscardAllStateObjects(privateState.values()); } catch (Exception e) { LOG.warn("Could not properly discard misc file states.", e); } } {code} Thus, this would delete the private state (such as RocksDB's MAINFEST), and once restore, job would not report Fi
[jira] [Updated] (FLINK-25478) Same materialized state handle should register multi times
[ https://issues.apache.org/jira/browse/FLINK-25478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang updated FLINK-25478: - Summary: Same materialized state handle should register multi times (was: Changelog materialization with incremental checkpoint could cause checkpointed data lost) > Same materialized state handle should register multi times > -- > > Key: FLINK-25478 > URL: https://issues.apache.org/jira/browse/FLINK-25478 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / State Backends >Reporter: Yun Tang >Priority: Critical > Fix For: 1.15.0 > > > Currently, changelog materialization would call RocksDB state backend's > snapshot method to generate {{IncrementalRemoteKeyedStateHandle}} as > ChangelogStateBackendHandleImpl's materialized artifacts. And before next > materialization, it will always report the same > {{IncrementalRemoteKeyedStateHandle}} as before. > It's fine to register this for the 1st time. However, for the 2nd time to > register {{IncrementalRemoteKeyedStateHandle}} (via > {{{}ChangelogStateBackendHandleImpl#registerSharedStates{}}}), it will > discard the private state artifacts without check the register reference: > IncrementalRemoteKeyedStateHandle: > {code:java} > public void discardState() throws Exception { > try { > StateUtil.bestEffortDiscardAllStateObjects(privateState.values()); > } catch (Exception e) { > LOG.warn("Could not properly discard misc file states.", e); > } > } > {code} > Thus, this would delete the private state (such as RocksDB's MAINFEST), and > once restore, job would not report FileNotFoundException. > > Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught > unexpected exception. > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:403) > at > org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:477) > > at > org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:90) > > at > org.apache.flink.state.changelog.ChangelogStateBackend.lambda$createKeyedStateBackend$1(ChangelogStateBackend.java:153) > at > org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:68) > at > org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:221) > > at > org.apache.flink.state.changelog.ChangelogStateBackend.createKeyedStateBackend(ChangelogStateBackend.java:145) > > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) > > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) > > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) > > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) > > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) > > ... 10 more > Caused by: java.io.FileNotFoundException: x > at > org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(PluginFileSystemFactory.java:127) > > at > org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68) > > at > org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:127) > > at > org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:110) > at > org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49) > > at > java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626) > ~[?:1.8.0_102] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147) > ~[?:1.8.0_102] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622) > ~[?:1.8.0_102] > ... 1 more -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25478) Changelog materialization with incremental checkpoint could cause checkpointed data lost
[ https://issues.apache.org/jira/browse/FLINK-25478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang updated FLINK-25478: - Description: Currently, changelog materialization would call RocksDB state backend's snapshot method to generate {{IncrementalRemoteKeyedStateHandle}} as ChangelogStateBackendHandleImpl's materialized artifacts. And before next materialization, it will always report the same {{IncrementalRemoteKeyedStateHandle}} as before. It's fine to register this for the 1st time. However, for the 2nd time to register {{IncrementalRemoteKeyedStateHandle}} (via {{{}ChangelogStateBackendHandleImpl#registerSharedStates{}}}), it will discard the private state artifacts without check the register reference: IncrementalRemoteKeyedStateHandle: {code:java} public void discardState() throws Exception { try { StateUtil.bestEffortDiscardAllStateObjects(privateState.values()); } catch (Exception e) { LOG.warn("Could not properly discard misc file states.", e); } } {code} Thus, this would delete the private state (such as RocksDB's MAINFEST), and once restore, job would not report FileNotFoundException. Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception. at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:403) at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:477) at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:90) at org.apache.flink.state.changelog.ChangelogStateBackend.lambda$createKeyedStateBackend$1(ChangelogStateBackend.java:153) at org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:68) at org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:221) at org.apache.flink.state.changelog.ChangelogStateBackend.createKeyedStateBackend(ChangelogStateBackend.java:145) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) ... 10 more Caused by: java.io.FileNotFoundException: x at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(PluginFileSystemFactory.java:127) at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68) at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:127) at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:110) at org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49) at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626) ~[?:1.8.0_102] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147) ~[?:1.8.0_102] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622) ~[?:1.8.0_102] ... 1 more was: Currently, changelog materialization would call RocksDB state backend's snapshot method to generate {{IncrementalRemoteKeyedStateHandle}} as ChangelogStateBackendHandleImpl's materialized artifacts. And before next materialization, it will always report the same {{IncrementalRemoteKeyedStateHandle}} as before. It's fine to register this for the 1st time. However, for the 2nd time to register {{IncrementalRemoteKeyedStateHandle}} (via {{ChangelogStateBackendHandleImpl#registerSharedStates}}), it will discard the private state artifacts without check the register reference: IncrementalRemoteKeyedStateHandle: {code:java} public void discardState() throws Exception { try { StateUtil.bestEffortDiscardAllStateObjects(privateState.values()); } catch (Exception e) { LOG.warn("Could not properly discard misc file states.", e); } } {code} Thus, this would delete the private state (such as RocksDB's MAINFEST), and once restore, job would not report
[jira] [Assigned] (FLINK-25477) The directory structure of the State Backends document is not standardized
[ https://issues.apache.org/jira/browse/FLINK-25477?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang reassigned FLINK-25477: Assignee: Hangxiang Yu > The directory structure of the State Backends document is not standardized > -- > > Key: FLINK-25477 > URL: https://issues.apache.org/jira/browse/FLINK-25477 > Project: Flink > Issue Type: Bug > Components: Documentation, Runtime / State Backends >Reporter: Hangxiang Yu >Assignee: Hangxiang Yu >Priority: Minor > Fix For: 1.15.0 > > Attachments: image-2021-12-29-16-56-24-657.png > > > The State Backends document uses multiple first-level headings. > It may cause the directory structure displayed incorrectly. > Just as the picture shows, the two titles are not in the table of contents on > the right. > > !image-2021-12-29-16-56-24-657.png|width=522,height=230! -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25479) Changlog materialization with incremental checkpoint cannot work well in local tests
Yun Tang created FLINK-25479: Summary: Changlog materialization with incremental checkpoint cannot work well in local tests Key: FLINK-25479 URL: https://issues.apache.org/jira/browse/FLINK-25479 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing, Runtime / State Backends, Tests Reporter: Yun Tang Fix For: 1.15.0 Currently, changelog materialization would call RocksDB state backend's snapshot method to generate {{IncrementalRemoteKeyedStateHandle}} as ChangelogStateBackendHandleImpl's materialized artifacts. And before next materialization, it will always report the same {{IncrementalRemoteKeyedStateHandle}} as before. For local tests, TM would report the {{IncrementalRemoteKeyedStateHandle}} to JM via local {{LocalRpcInvocation}}. However, as {{LocalRpcInvocation}} would not de/serialize message, which leads once we register the {{IncrementalRemoteKeyedStateHandle}} on JM side, it will also add a {{sharedStateRegistry}} to the one located on TM side. For the 2nd checkpoint, TM would reported same {{IncrementalRemoteKeyedStateHandle}} with {{sharedStateRegistry}} to JM. And it will then throw exception as it already contains a {{sharedStateRegistry}}: IncrementalRemoteKeyedStateHandle {code:java} public void registerSharedStates(SharedStateRegistry stateRegistry, long checkpointID) { Preconditions.checkState( sharedStateRegistry != stateRegistry, "The state handle has already registered its shared states to the given registry."); } {code} This bug would go in distribution environment as {{IncrementalRemoteKeyedStateHandle}} would be serialized and {{sharedStateRegistry}} is tagged as {{transient}}. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25478) Changelog materialization with incremental checkpoint could cause checkpointed data lost
Yun Tang created FLINK-25478: Summary: Changelog materialization with incremental checkpoint could cause checkpointed data lost Key: FLINK-25478 URL: https://issues.apache.org/jira/browse/FLINK-25478 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing, Runtime / State Backends Reporter: Yun Tang Fix For: 1.15.0 Currently, changelog materialization would call RocksDB state backend's snapshot method to generate {{IncrementalRemoteKeyedStateHandle}} as ChangelogStateBackendHandleImpl's materialized artifacts. And before next materialization, it will always report the same {{IncrementalRemoteKeyedStateHandle}} as before. It's fine to register this for the 1st time. However, for the 2nd time to register {{IncrementalRemoteKeyedStateHandle}} (via {{ChangelogStateBackendHandleImpl#registerSharedStates}}), it will discard the private state artifacts without check the register reference: IncrementalRemoteKeyedStateHandle: {code:java} public void discardState() throws Exception { try { StateUtil.bestEffortDiscardAllStateObjects(privateState.values()); } catch (Exception e) { LOG.warn("Could not properly discard misc file states.", e); } } {code} Thus, this would delete the private state (such as RocksDB's MAINFEST), and once restore, job would not report FileNotFoundException. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (FLINK-25465) Correct the logic of materizating wrapped changelog state
[ https://issues.apache.org/jira/browse/FLINK-25465?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang resolved FLINK-25465. -- Resolution: Fixed merged in master: 57391962d5119816beebec56e78a034130e0bfdd > Correct the logic of materizating wrapped changelog state > - > > Key: FLINK-25465 > URL: https://issues.apache.org/jira/browse/FLINK-25465 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > {{ChangelogKeyedStateBackend#keyValueStatesByName}} would store wrapped > state, such as TTL state or latency tracking state. It will throw exception > on initMaterialization if wrapped: > {code:java} > for (InternalKvState changelogState : keyValueStatesByName.values()) > { > checkState(changelogState instanceof ChangelogState); > ((ChangelogState) changelogState).resetWritingMetaFlag(); > } > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (FLINK-25446) Avoid sanity check on read bytes on DataInputStream#read(byte[])
[ https://issues.apache.org/jira/browse/FLINK-25446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang resolved FLINK-25446. -- Resolution: Fixed Merged master: c0f46ef324c35b3ed7813c74931ab9cb589896f7 release-1.14: 24016b83eec53ba7124d651201daa580323b80fc > Avoid sanity check on read bytes on DataInputStream#read(byte[]) > > > Key: FLINK-25446 > URL: https://issues.apache.org/jira/browse/FLINK-25446 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / State Backends >Affects Versions: 1.14.2 >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Critical > Labels: pull-request-available > Fix For: 1.15.0, 1.14.3 > > > Current changelog related code would check the number of read bytes whether > equal to target bytes: > {code:java} > checkState(size == input.read(bytes)); > {code} > However, this is not correct as the java doc said: {{"An attempt is made to > read as many as len bytes, but a smaller number may be read, possibly zero."}} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-5279) Improve error message when trying to access keyed state in non-keyed operator
[ https://issues.apache.org/jira/browse/FLINK-5279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17466060#comment-17466060 ] Yun Tang commented on FLINK-5279: - Current error message has been "{{{}Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.{}}}", and I think we could still improve this message via adding the state descriptor name. > Improve error message when trying to access keyed state in non-keyed operator > - > > Key: FLINK-5279 > URL: https://issues.apache.org/jira/browse/FLINK-5279 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.1.3 >Reporter: Ufuk Celebi >Priority: Minor > Labels: auto-deprioritized-major, stale-minor > > When trying to access keyed state in a non-keyed operator, the error message > is not very helpful. You get a trace like this: > {code} > java.lang.RuntimeException: Error while getting state > ... > Caused by: java.lang.RuntimeException: State key serializer has not been > configured in the config. This operation cannot use partitioned state. > {code} > It will be helpful to users if this is more explicit to users, stating that > the API can only be used on keyed streams, etc. > If this applies to the current master as well, we should fix it there, too. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Closed] (FLINK-21218) "state.checkpoints.dir" should be required only when "execution.checkpointing.interval" is specified
[ https://issues.apache.org/jira/browse/FLINK-21218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang closed FLINK-21218. Resolution: Information Provided Since Flink already use new API EmbeddedRocksDBStateBackend to replace RocksDBStateBackend in state backend loader, this problem has gone. > "state.checkpoints.dir" should be required only when > "execution.checkpointing.interval" is specified > > > Key: FLINK-21218 > URL: https://issues.apache.org/jira/browse/FLINK-21218 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Dongwon Kim >Priority: Minor > > Users have to specify "state.checkpoints.dir" even when to use a state > backend as an unreliable per-key state storage. > Thread in user ML : > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Setting-quot-unreliable-quot-RocksDB-state-backend-w-o-quot-execution-checkpointing-interval-quot-ans-td41003.html -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25426) UnalignedCheckpointRescaleITCase.shouldRescaleUnalignedCheckpoint fails on AZP because it cannot allocate enough network buffers
[ https://issues.apache.org/jira/browse/FLINK-25426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17466044#comment-17466044 ] Yun Tang commented on FLINK-25426: -- Another instance: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=28634&view=logs&j=5c8e7682-d68f-54d1-16a2-a09310218a49&t=86f654fa-ab48-5c1a-25f4-7e7f6afb9bba > UnalignedCheckpointRescaleITCase.shouldRescaleUnalignedCheckpoint fails on > AZP because it cannot allocate enough network buffers > > > Key: FLINK-25426 > URL: https://issues.apache.org/jira/browse/FLINK-25426 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.15.0 >Reporter: Till Rohrmann >Assignee: Anton Kalashnikov >Priority: Blocker > Labels: test-stability > Fix For: 1.15.0 > > > The test > {{UnalignedCheckpointRescaleITCase.shouldRescaleUnalignedCheckpoint}} fails > with > {code} > 2021-12-23T02:54:46.2862342Z Dec 23 02:54:46 [ERROR] > UnalignedCheckpointRescaleITCase.shouldRescaleUnalignedCheckpoint Time > elapsed: 2.992 s <<< ERROR! > 2021-12-23T02:54:46.2865774Z Dec 23 02:54:46 java.lang.OutOfMemoryError: > Could not allocate enough memory segments for NetworkBufferPool (required > (Mb): 64, allocated (Mb): 14, missing (Mb): 50). Cause: Direct buffer memory. > The direct out-of-memory error has occurred. This can mean two things: either > job(s) require(s) a larger size of JVM direct memory or there is a direct > memory leak. The direct memory can be allocated by user code or some of its > dependencies. In this case 'taskmanager.memory.task.off-heap.size' > configuration option should be increased. Flink framework and its > dependencies also consume the direct memory, mostly for network > communication. The most of network memory is managed by Flink and should not > result in out-of-memory error. In certain special cases, in particular for > jobs with high parallelism, the framework may require more direct memory > which is not managed by Flink. In this case > 'taskmanager.memory.framework.off-heap.size' configuration option should be > increased. If the error persists then there is probably a direct memory leak > in user code or some of its dependencies which has to be investigated and > fixed. The task executor has to be shutdown... > 2021-12-23T02:54:46.2868239Z Dec 23 02:54:46 at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.(NetworkBufferPool.java:138) > 2021-12-23T02:54:46.2868975Z Dec 23 02:54:46 at > org.apache.flink.runtime.io.network.NettyShuffleServiceFactory.createNettyShuffleEnvironment(NettyShuffleServiceFactory.java:140) > 2021-12-23T02:54:46.2869771Z Dec 23 02:54:46 at > org.apache.flink.runtime.io.network.NettyShuffleServiceFactory.createNettyShuffleEnvironment(NettyShuffleServiceFactory.java:94) > 2021-12-23T02:54:46.2870550Z Dec 23 02:54:46 at > org.apache.flink.runtime.io.network.NettyShuffleServiceFactory.createShuffleEnvironment(NettyShuffleServiceFactory.java:79) > 2021-12-23T02:54:46.2871312Z Dec 23 02:54:46 at > org.apache.flink.runtime.io.network.NettyShuffleServiceFactory.createShuffleEnvironment(NettyShuffleServiceFactory.java:58) > 2021-12-23T02:54:46.2872062Z Dec 23 02:54:46 at > org.apache.flink.runtime.taskexecutor.TaskManagerServices.createShuffleEnvironment(TaskManagerServices.java:414) > 2021-12-23T02:54:46.2872767Z Dec 23 02:54:46 at > org.apache.flink.runtime.taskexecutor.TaskManagerServices.fromConfiguration(TaskManagerServices.java:282) > 2021-12-23T02:54:46.2873436Z Dec 23 02:54:46 at > org.apache.flink.runtime.taskexecutor.TaskManagerRunner.startTaskManager(TaskManagerRunner.java:523) > 2021-12-23T02:54:46.2877615Z Dec 23 02:54:46 at > org.apache.flink.runtime.minicluster.MiniCluster.startTaskManager(MiniCluster.java:645) > 2021-12-23T02:54:46.2878247Z Dec 23 02:54:46 at > org.apache.flink.runtime.minicluster.MiniCluster.startTaskManagers(MiniCluster.java:626) > 2021-12-23T02:54:46.2878856Z Dec 23 02:54:46 at > org.apache.flink.runtime.minicluster.MiniCluster.start(MiniCluster.java:379) > 2021-12-23T02:54:46.2879487Z Dec 23 02:54:46 at > org.apache.flink.runtime.testutils.MiniClusterResource.startMiniCluster(MiniClusterResource.java:209) > 2021-12-23T02:54:46.2880152Z Dec 23 02:54:46 at > org.apache.flink.runtime.testutils.MiniClusterResource.before(MiniClusterResource.java:95) > 2021-12-23T02:54:46.2880821Z Dec 23 02:54:46 at > org.apache.flink.test.util.MiniClusterWithClientResource.before(MiniClusterWithClientResource.java:64) > 2021-12-23T02:54:46.2881519Z Dec 23 02:54:46 at > org.apache.flink.test.checkpointing.UnalignedCheckpointTestBase.execute(UnalignedCheckpo
[jira] [Created] (FLINK-25466) TTL configuration could parse in StateTtlConfig#DISABLED
Yun Tang created FLINK-25466: Summary: TTL configuration could parse in StateTtlConfig#DISABLED Key: FLINK-25466 URL: https://issues.apache.org/jira/browse/FLINK-25466 Project: Flink Issue Type: Bug Components: Runtime / State Backends Reporter: Yun Tang Assignee: Yun Tang Fix For: 1.15.0, 1.13.6, 1.14.3 Current API \{{StateDescriptor#enableTimeToLive(StateTtlConfig)}} cannot handle StateTtlConfig#DISABLED due to its current implementation. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25465) Correct the logic of materizating wrapped changelog state
[ https://issues.apache.org/jira/browse/FLINK-25465?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang updated FLINK-25465: - Summary: Correct the logic of materizating wrapped changelog state (was: Correct the logic to materizate on wrapped changelog state) > Correct the logic of materizating wrapped changelog state > - > > Key: FLINK-25465 > URL: https://issues.apache.org/jira/browse/FLINK-25465 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Major > Fix For: 1.15.0 > > > {{ChangelogKeyedStateBackend#keyValueStatesByName}} would store wrapped > state, such as TTL state or latency tracking state. It will throw exception > on initMaterialization if wrapped: > {code:java} > for (InternalKvState changelogState : keyValueStatesByName.values()) > { > checkState(changelogState instanceof ChangelogState); > ((ChangelogState) changelogState).resetWritingMetaFlag(); > } > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25465) Correct the logic to materizate on wrapped changelog state
Yun Tang created FLINK-25465: Summary: Correct the logic to materizate on wrapped changelog state Key: FLINK-25465 URL: https://issues.apache.org/jira/browse/FLINK-25465 Project: Flink Issue Type: Bug Components: Runtime / State Backends Reporter: Yun Tang Assignee: Yun Tang Fix For: 1.15.0 {{ChangelogKeyedStateBackend#keyValueStatesByName}} would store wrapped state, such as TTL state or latency tracking state. It will throw exception on initMaterialization if wrapped: {code:java} for (InternalKvState changelogState : keyValueStatesByName.values()) { checkState(changelogState instanceof ChangelogState); ((ChangelogState) changelogState).resetWritingMetaFlag(); } {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25426) UnalignedCheckpointRescaleITCase.shouldRescaleUnalignedCheckpoint fails on AZP because it cannot allocate enough network buffers
[ https://issues.apache.org/jira/browse/FLINK-25426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17465955#comment-17465955 ] Yun Tang commented on FLINK-25426: -- Another instance: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=28607&view=logs&j=38d6b56a-d502-56fb-7b73-c09f8fe7becd&t=6e6509fa-8a5d-5a6c-e17e-64f5ecc17842 > UnalignedCheckpointRescaleITCase.shouldRescaleUnalignedCheckpoint fails on > AZP because it cannot allocate enough network buffers > > > Key: FLINK-25426 > URL: https://issues.apache.org/jira/browse/FLINK-25426 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.15.0 >Reporter: Till Rohrmann >Priority: Blocker > Labels: test-stability > Fix For: 1.15.0 > > > The test > {{UnalignedCheckpointRescaleITCase.shouldRescaleUnalignedCheckpoint}} fails > with > {code} > 2021-12-23T02:54:46.2862342Z Dec 23 02:54:46 [ERROR] > UnalignedCheckpointRescaleITCase.shouldRescaleUnalignedCheckpoint Time > elapsed: 2.992 s <<< ERROR! > 2021-12-23T02:54:46.2865774Z Dec 23 02:54:46 java.lang.OutOfMemoryError: > Could not allocate enough memory segments for NetworkBufferPool (required > (Mb): 64, allocated (Mb): 14, missing (Mb): 50). Cause: Direct buffer memory. > The direct out-of-memory error has occurred. This can mean two things: either > job(s) require(s) a larger size of JVM direct memory or there is a direct > memory leak. The direct memory can be allocated by user code or some of its > dependencies. In this case 'taskmanager.memory.task.off-heap.size' > configuration option should be increased. Flink framework and its > dependencies also consume the direct memory, mostly for network > communication. The most of network memory is managed by Flink and should not > result in out-of-memory error. In certain special cases, in particular for > jobs with high parallelism, the framework may require more direct memory > which is not managed by Flink. In this case > 'taskmanager.memory.framework.off-heap.size' configuration option should be > increased. If the error persists then there is probably a direct memory leak > in user code or some of its dependencies which has to be investigated and > fixed. The task executor has to be shutdown... > 2021-12-23T02:54:46.2868239Z Dec 23 02:54:46 at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.(NetworkBufferPool.java:138) > 2021-12-23T02:54:46.2868975Z Dec 23 02:54:46 at > org.apache.flink.runtime.io.network.NettyShuffleServiceFactory.createNettyShuffleEnvironment(NettyShuffleServiceFactory.java:140) > 2021-12-23T02:54:46.2869771Z Dec 23 02:54:46 at > org.apache.flink.runtime.io.network.NettyShuffleServiceFactory.createNettyShuffleEnvironment(NettyShuffleServiceFactory.java:94) > 2021-12-23T02:54:46.2870550Z Dec 23 02:54:46 at > org.apache.flink.runtime.io.network.NettyShuffleServiceFactory.createShuffleEnvironment(NettyShuffleServiceFactory.java:79) > 2021-12-23T02:54:46.2871312Z Dec 23 02:54:46 at > org.apache.flink.runtime.io.network.NettyShuffleServiceFactory.createShuffleEnvironment(NettyShuffleServiceFactory.java:58) > 2021-12-23T02:54:46.2872062Z Dec 23 02:54:46 at > org.apache.flink.runtime.taskexecutor.TaskManagerServices.createShuffleEnvironment(TaskManagerServices.java:414) > 2021-12-23T02:54:46.2872767Z Dec 23 02:54:46 at > org.apache.flink.runtime.taskexecutor.TaskManagerServices.fromConfiguration(TaskManagerServices.java:282) > 2021-12-23T02:54:46.2873436Z Dec 23 02:54:46 at > org.apache.flink.runtime.taskexecutor.TaskManagerRunner.startTaskManager(TaskManagerRunner.java:523) > 2021-12-23T02:54:46.2877615Z Dec 23 02:54:46 at > org.apache.flink.runtime.minicluster.MiniCluster.startTaskManager(MiniCluster.java:645) > 2021-12-23T02:54:46.2878247Z Dec 23 02:54:46 at > org.apache.flink.runtime.minicluster.MiniCluster.startTaskManagers(MiniCluster.java:626) > 2021-12-23T02:54:46.2878856Z Dec 23 02:54:46 at > org.apache.flink.runtime.minicluster.MiniCluster.start(MiniCluster.java:379) > 2021-12-23T02:54:46.2879487Z Dec 23 02:54:46 at > org.apache.flink.runtime.testutils.MiniClusterResource.startMiniCluster(MiniClusterResource.java:209) > 2021-12-23T02:54:46.2880152Z Dec 23 02:54:46 at > org.apache.flink.runtime.testutils.MiniClusterResource.before(MiniClusterResource.java:95) > 2021-12-23T02:54:46.2880821Z Dec 23 02:54:46 at > org.apache.flink.test.util.MiniClusterWithClientResource.before(MiniClusterWithClientResource.java:64) > 2021-12-23T02:54:46.2881519Z Dec 23 02:54:46 at > org.apache.flink.test.checkpointing.UnalignedCheckpointTestBase.execute(UnalignedCheckpointTestBase.java:151) > 2021-12-23T02:54:
[jira] [Resolved] (FLINK-24785) Relocate RocksDB's log under flink log directory by default
[ https://issues.apache.org/jira/browse/FLINK-24785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang resolved FLINK-24785. -- Resolution: Fixed merged in master: cdef0973b21196ca23a31486f5a996f0b397136d > Relocate RocksDB's log under flink log directory by default > --- > > Key: FLINK-24785 > URL: https://issues.apache.org/jira/browse/FLINK-24785 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Reporter: Yun Tang >Assignee: Nicholas Jiang >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > Previously, RocksDB's log locates at its own DB folder, which makes the > debuging RocksDB not so easy. We could let RocksDB's log stay in Flink's log > directory by default. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25260) Recovery fails when using changelog+s3+presto
[ https://issues.apache.org/jira/browse/FLINK-25260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17465646#comment-17465646 ] Yun Tang commented on FLINK-25260: -- I think this issue would be resolved by https://issues.apache.org/jira/browse/FLINK-25446 > Recovery fails when using changelog+s3+presto > - > > Key: FLINK-25260 > URL: https://issues.apache.org/jira/browse/FLINK-25260 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem, Runtime / State Backends >Affects Versions: 1.15.0 >Reporter: Roman Khachatryan >Priority: Major > > Recovery succeeds if using local FS or hadoop S3 plugin, but fails with > Presto: > {code} > Caused by: java.lang.IllegalStateException > at org.apache.flink.util.Preconditions.checkState(Preconditions.java:177) > at > org.apache.flink.changelog.fs.StateChangeFormat$1.readChange(StateChangeFormat.java:138) > at > org.apache.flink.changelog.fs.StateChangeFormat$1.next(StateChangeFormat.java:129) > at > org.apache.flink.changelog.fs.StateChangeFormat$1.next(StateChangeFormat.java:98) > at > org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.next(StateChangelogHandleStreamHandleReader.java:76) > at > org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.next(StateChangelogHandleStreamHandleReader.java:61) > at > org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.readBackendHandle(ChangelogBackendRestoreOperation.java:94) > at > org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:74) > at > org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:221) > at > org.apache.flink.state.changelog.ChangelogStateBackend.createKeyedStateBackend(ChangelogStateBackend.java:145) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) > ... 13 more > {code} > This is likely caused by some intermediate buffers not being flushed. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25458) Supports local recovery
[ https://issues.apache.org/jira/browse/FLINK-25458?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang updated FLINK-25458: - Component/s: Runtime / Checkpointing Runtime / State Backends > Supports local recovery > --- > > Key: FLINK-25458 > URL: https://issues.apache.org/jira/browse/FLINK-25458 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Checkpointing, Runtime / State Backends >Reporter: Yun Tang >Priority: Major > > Current changelog state-backend implementation cannot work well with local > recovery enabled. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25458) Supports local recovery
Yun Tang created FLINK-25458: Summary: Supports local recovery Key: FLINK-25458 URL: https://issues.apache.org/jira/browse/FLINK-25458 Project: Flink Issue Type: Sub-task Reporter: Yun Tang Current changelog state-backend implementation cannot work well with local recovery enabled. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25446) Avoid sanity check on read bytes on DataInputStream#read(byte[])
Yun Tang created FLINK-25446: Summary: Avoid sanity check on read bytes on DataInputStream#read(byte[]) Key: FLINK-25446 URL: https://issues.apache.org/jira/browse/FLINK-25446 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing, Runtime / State Backends Affects Versions: 1.14.2 Reporter: Yun Tang Assignee: Yun Tang Fix For: 1.15.0, 1.14.3 Current changelog related code would check the number of read bytes whether equal to target bytes: {code:java} checkState(size == input.read(bytes)); {code} However, this is not correct as the java doc said: {{"An attempt is made to read as many as len bytes, but a smaller number may be read, possibly zero."}} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (FLINK-25429) Avoid to close output streams twice during uploading changelogs
[ https://issues.apache.org/jira/browse/FLINK-25429?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang resolved FLINK-25429. -- Resolution: Fixed > Avoid to close output streams twice during uploading changelogs > --- > > Key: FLINK-25429 > URL: https://issues.apache.org/jira/browse/FLINK-25429 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > Current uploader implementation would close {{stream}} and {{fsStream}} one > by one, which lead to {{fsStream}} closed twice. > {code:java} > try (FSDataOutputStream fsStream = fileSystem.create(path, > NO_OVERWRITE)) { > fsStream.write(compression ? 1 : 0); > try (OutputStreamWithPos stream = wrap(fsStream); ) { > final Map> tasksOffsets > = new HashMap<>(); > for (UploadTask task : tasks) { > tasksOffsets.put(task, format.write(stream, > task.changeSets)); > } > FileStateHandle handle = new FileStateHandle(path, > stream.getPos()); > // WARN: streams have to be closed before returning the > results > // otherwise JM may receive invalid handles > return new LocalResult(tasksOffsets, handle); > } > } > {code} > Not all file system supports to close same stream twice. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (FLINK-25094) The verify code in LatencyTrackingMapStateTest#verifyIterator is not actually executed
[ https://issues.apache.org/jira/browse/FLINK-25094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang resolved FLINK-25094. -- Resolution: Fixed > The verify code in LatencyTrackingMapStateTest#verifyIterator is not actually > executed > -- > > Key: FLINK-25094 > URL: https://issues.apache.org/jira/browse/FLINK-25094 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends, Tests >Reporter: Jinzhong Li >Assignee: Jinzhong Li >Priority: Minor > Labels: pull-request-available > Fix For: 1.15.0 > > > In LatencyTrackingMapStateTest, > iterator()/entries().iterator()/keys().iterator()/values().iterator() will be > invoke before verifyIterator method is invoked, this is, > iterator()/... will be invode before putting the test data into > latencyTrackingMapState. So the verify code is not actually executed since > "iterator.hasNext()" is always false. > {code:java} > private void verifyIterator( > LatencyTrackingMapState > latencyTrackingState, > LatencyTrackingMapState.MapStateLatencyMetrics > latencyTrackingStateMetric, > Iterator iterator, > boolean removeIterator) > throws Exception { > ThreadLocalRandom random = ThreadLocalRandom.current(); > for (int index = 1; index <= SAMPLE_INTERVAL; index++) { > latencyTrackingState.put((long) index, random.nextDouble()); > } > int count = 1; > while (iterator.hasNext()) { > int expectedResult = count == SAMPLE_INTERVAL ? 0 : count; > assertEquals(expectedResult, > latencyTrackingStateMetric.getIteratorHasNextCount()); > iterator.next(); > assertEquals(expectedResult, > latencyTrackingStateMetric.getIteratorNextCount()); > if (removeIterator) { > iterator.remove(); > assertEquals(expectedResult, > latencyTrackingStateMetric.getIteratorRemoveCount()); > } > count += 1; > } > // as we call #hasNext on more time than #next, to avoid complex check, > just reset hasNext > // counter in the end. > latencyTrackingStateMetric.resetIteratorHasNextCount(); > latencyTrackingState.clear(); > } {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25094) The verify code in LatencyTrackingMapStateTest#verifyIterator is not actually executed
[ https://issues.apache.org/jira/browse/FLINK-25094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17464914#comment-17464914 ] Yun Tang commented on FLINK-25094: -- Merged in master: d08a1d0f4035485283f1063d4d0fc0a8aaca > The verify code in LatencyTrackingMapStateTest#verifyIterator is not actually > executed > -- > > Key: FLINK-25094 > URL: https://issues.apache.org/jira/browse/FLINK-25094 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends, Tests >Reporter: Jinzhong Li >Assignee: Jinzhong Li >Priority: Minor > Labels: pull-request-available > Fix For: 1.15.0 > > > In LatencyTrackingMapStateTest, > iterator()/entries().iterator()/keys().iterator()/values().iterator() will be > invoke before verifyIterator method is invoked, this is, > iterator()/... will be invode before putting the test data into > latencyTrackingMapState. So the verify code is not actually executed since > "iterator.hasNext()" is always false. > {code:java} > private void verifyIterator( > LatencyTrackingMapState > latencyTrackingState, > LatencyTrackingMapState.MapStateLatencyMetrics > latencyTrackingStateMetric, > Iterator iterator, > boolean removeIterator) > throws Exception { > ThreadLocalRandom random = ThreadLocalRandom.current(); > for (int index = 1; index <= SAMPLE_INTERVAL; index++) { > latencyTrackingState.put((long) index, random.nextDouble()); > } > int count = 1; > while (iterator.hasNext()) { > int expectedResult = count == SAMPLE_INTERVAL ? 0 : count; > assertEquals(expectedResult, > latencyTrackingStateMetric.getIteratorHasNextCount()); > iterator.next(); > assertEquals(expectedResult, > latencyTrackingStateMetric.getIteratorNextCount()); > if (removeIterator) { > iterator.remove(); > assertEquals(expectedResult, > latencyTrackingStateMetric.getIteratorRemoveCount()); > } > count += 1; > } > // as we call #hasNext on more time than #next, to avoid complex check, > just reset hasNext > // counter in the end. > latencyTrackingStateMetric.resetIteratorHasNextCount(); > latencyTrackingState.clear(); > } {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25429) Avoid to close output streams twice during uploading changelogs
[ https://issues.apache.org/jira/browse/FLINK-25429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17464911#comment-17464911 ] Yun Tang commented on FLINK-25429: -- merged in master: 804eb8dda556a2bea35c69a2662f13d1dafb9255 > Avoid to close output streams twice during uploading changelogs > --- > > Key: FLINK-25429 > URL: https://issues.apache.org/jira/browse/FLINK-25429 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > Current uploader implementation would close {{stream}} and {{fsStream}} one > by one, which lead to {{fsStream}} closed twice. > {code:java} > try (FSDataOutputStream fsStream = fileSystem.create(path, > NO_OVERWRITE)) { > fsStream.write(compression ? 1 : 0); > try (OutputStreamWithPos stream = wrap(fsStream); ) { > final Map> tasksOffsets > = new HashMap<>(); > for (UploadTask task : tasks) { > tasksOffsets.put(task, format.write(stream, > task.changeSets)); > } > FileStateHandle handle = new FileStateHandle(path, > stream.getPos()); > // WARN: streams have to be closed before returning the > results > // otherwise JM may receive invalid handles > return new LocalResult(tasksOffsets, handle); > } > } > {code} > Not all file system supports to close same stream twice. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25438) KafkaProducerExactlyOnceITCase.testMultipleSinkOperators failed due to topic 'exactlyTopicCustomOperator20' already exists
Yun Tang created FLINK-25438: Summary: KafkaProducerExactlyOnceITCase.testMultipleSinkOperators failed due to topic 'exactlyTopicCustomOperator20' already exists Key: FLINK-25438 URL: https://issues.apache.org/jira/browse/FLINK-25438 Project: Flink Issue Type: Bug Components: Connectors / Kafka, Tests Reporter: Yun Tang Dec 23 13:48:21 [ERROR] Failures: Dec 23 13:48:21 [ERROR] KafkaProducerExactlyOnceITCase.testMultipleSinkOperators:36->KafkaProducerTestBase.testExactlyOnce:236->KafkaTestBase.createTestTopic:216 Create test topic : exactlyTopicCustomOperator20 failed, org.apache.kafka.common.errors.TopicExistsException: Topic 'exactlyTopicCustomOperator20' already exists. instance: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=28524&view=logs&j=c5f0071e-1851-543e-9a45-9ac140befc32&t=15a22db7-8faa-5b34-3920-d33c9f0ca23c -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25429) Avoid to close output streams twice during uploading changelogs
[ https://issues.apache.org/jira/browse/FLINK-25429?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang updated FLINK-25429: - Fix Version/s: 1.15.0 > Avoid to close output streams twice during uploading changelogs > --- > > Key: FLINK-25429 > URL: https://issues.apache.org/jira/browse/FLINK-25429 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Major > Fix For: 1.15.0 > > > Current uploader implementation would close {{stream}} and {{fsStream}} one > by one, which lead to {{fsStream}} closed twice. > {code:java} > try (FSDataOutputStream fsStream = fileSystem.create(path, > NO_OVERWRITE)) { > fsStream.write(compression ? 1 : 0); > try (OutputStreamWithPos stream = wrap(fsStream); ) { > final Map> tasksOffsets > = new HashMap<>(); > for (UploadTask task : tasks) { > tasksOffsets.put(task, format.write(stream, > task.changeSets)); > } > FileStateHandle handle = new FileStateHandle(path, > stream.getPos()); > // WARN: streams have to be closed before returning the > results > // otherwise JM may receive invalid handles > return new LocalResult(tasksOffsets, handle); > } > } > {code} > Not all file system supports to close same stream twice. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25429) Avoid to close output streams twice during uploading changelogs
Yun Tang created FLINK-25429: Summary: Avoid to close output streams twice during uploading changelogs Key: FLINK-25429 URL: https://issues.apache.org/jira/browse/FLINK-25429 Project: Flink Issue Type: Improvement Components: Runtime / State Backends Reporter: Yun Tang Assignee: Yun Tang Current uploader implementation would close {{stream}} and {{fsStream}} one by one, which lead to {{fsStream}} closed twice. {code:java} try (FSDataOutputStream fsStream = fileSystem.create(path, NO_OVERWRITE)) { fsStream.write(compression ? 1 : 0); try (OutputStreamWithPos stream = wrap(fsStream); ) { final Map> tasksOffsets = new HashMap<>(); for (UploadTask task : tasks) { tasksOffsets.put(task, format.write(stream, task.changeSets)); } FileStateHandle handle = new FileStateHandle(path, stream.getPos()); // WARN: streams have to be closed before returning the results // otherwise JM may receive invalid handles return new LocalResult(tasksOffsets, handle); } } {code} Not all file system supports to close same stream twice. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Assigned] (FLINK-17808) Rename checkpoint meta file to "_metadata" until it has completed writing
[ https://issues.apache.org/jira/browse/FLINK-17808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang reassigned FLINK-17808: Assignee: Junfan Zhang > Rename checkpoint meta file to "_metadata" until it has completed writing > - > > Key: FLINK-17808 > URL: https://issues.apache.org/jira/browse/FLINK-17808 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Affects Versions: 1.10.0 >Reporter: Yun Tang >Assignee: Junfan Zhang >Priority: Minor > Labels: auto-deprioritized-major, pull-request-available > Fix For: 1.15.0 > > > In practice, some developers or customers would use some strategy to find the > recent _metadata as the checkpoint to recover (e.g as many proposals in > FLINK-9043 suggest). However, there existed a "_meatadata" file does not mean > the checkpoint have been completed as the writing to create the "_meatadata" > file could break as some force quit (e.g. yarn application -kill). > We could create the checkpoint meta stream to write data to file named as > "_metadata.inprogress" and renamed it to "_metadata" once completed writing. > By doing so, we could ensure the "_metadata" is not broken. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-17808) Rename checkpoint meta file to "_metadata" until it has completed writing
[ https://issues.apache.org/jira/browse/FLINK-17808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17464459#comment-17464459 ] Yun Tang commented on FLINK-17808: -- Since [~zuston] had already provided a PR to review, already assigned this ticket to him. Please refactor the PR: 1. Consider file systems which not support recoverable writer. 2. Resolve all failed tests of current PR. 3. Add a separate test to verify the logic. > Rename checkpoint meta file to "_metadata" until it has completed writing > - > > Key: FLINK-17808 > URL: https://issues.apache.org/jira/browse/FLINK-17808 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Affects Versions: 1.10.0 >Reporter: Yun Tang >Assignee: Junfan Zhang >Priority: Minor > Labels: auto-deprioritized-major, pull-request-available > Fix For: 1.15.0 > > > In practice, some developers or customers would use some strategy to find the > recent _metadata as the checkpoint to recover (e.g as many proposals in > FLINK-9043 suggest). However, there existed a "_meatadata" file does not mean > the checkpoint have been completed as the writing to create the "_meatadata" > file could break as some force quit (e.g. yarn application -kill). > We could create the checkpoint meta stream to write data to file named as > "_metadata.inprogress" and renamed it to "_metadata" once completed writing. > By doing so, we could ensure the "_metadata" is not broken. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25423) Enable loading state backend via configuration in state processor api
Yun Tang created FLINK-25423: Summary: Enable loading state backend via configuration in state processor api Key: FLINK-25423 URL: https://issues.apache.org/jira/browse/FLINK-25423 Project: Flink Issue Type: Improvement Components: API / State Processor, Runtime / State Backends Reporter: Yun Tang Fix For: 1.15.0 Currently, state processor API would load savepoint via explictly initalizated state backend on client side, which is like {{StreamExecutionEnvironment#setStateBackend(stateBackend)}}: {code:java} Savepoint.load(bEnv, "hdfs://path/", new HashMapStateBackend()); {code} As we all konw, stream env also support to load state backend via configuration to provide flexibility to load state backends especially some customized state backend. This could also benefit state processor API with similiar ability. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25360) Add State Desc to CheckpointMetadata
[ https://issues.apache.org/jira/browse/FLINK-25360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17464242#comment-17464242 ] Yun Tang commented on FLINK-25360: -- [~liufangqi] Since RocksDB incremenatl checkpoints already have enough information in checkpoint meta, could you try to use state processor API to achive your goal on that? Your request actually has two problems, and I think they deserve two different discussion threads in dev mailling list: # Could we write meta state info in savepoint meta. [~dwysakowicz] what do you think of this especially you have already done the unified savepoint format work. # Whether we should persist addition info such as default value, reducingFunction to persist the whole state descriptor. I prefer to store them. [~tzulitai] what do you think of this? BTW, since many guys are on Christmas vacation, maybe we can launch discussions after new year's day. > Add State Desc to CheckpointMetadata > > > Key: FLINK-25360 > URL: https://issues.apache.org/jira/browse/FLINK-25360 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing, Runtime / State Backends >Reporter: 刘方奇 >Priority: Major > Attachments: image-2021-12-17-20-01-42-423.png > > > Now we can't get the State Descriptor info in the checkpoint meta. Like the > case if we use state-processor-api to load state then rewrite state, we can't > flexible use the state. > Maybe there are other cases we need the State Descriptor, so can we add this > info? -- This message was sent by Atlassian Jira (v8.20.1#820001)