[jira] [Updated] (FLINK-25528) state processor api do not support increment checkpoint

2022-02-21 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-25528:
-
Fix Version/s: 1.16.0

> state processor api do not support increment checkpoint
> ---
>
> Key: FLINK-25528
> URL: https://issues.apache.org/jira/browse/FLINK-25528
> Project: Flink
>  Issue Type: Improvement
>  Components: API / State Processor, Runtime / State Backends
>Reporter: 刘方奇
>Assignee: 刘方奇
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> As the title, in the state-processor-api, we use the savepoint opition to 
> snapshot state defaultly in org.apache.flink.state.api.output.SnapshotUtils.
> But in many cases, we maybe need to snapshot state incremently which have 
> better performance than savepoint.
> Shall we add the config to chose the checkpoint way just like flink stream 
> backend?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26050) Too many small sst files in rocksdb state backend when using processing time window

2022-02-20 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17495303#comment-17495303
 ] 

Yun Tang commented on FLINK-26050:
--

[~shenjiaqi] Since Flink has disabled the WAL, and the suggestion of 
configuring {{log_size_for_flush}} should not work here.

It seems a limit of RocksDB level compaction, and I suggest to store those 
timers on heap considering not so many timers and it works fine in many cases.

> Too many small sst files in rocksdb state backend when using processing time 
> window
> ---
>
> Key: FLINK-26050
> URL: https://issues.apache.org/jira/browse/FLINK-26050
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.10.2, 1.14.3
>Reporter: shen
>Priority: Major
> Attachments: image-2022-02-09-21-22-13-920.png, 
> image-2022-02-11-10-32-14-956.png, image-2022-02-11-10-36-46-630.png, 
> image-2022-02-14-13-04-52-325.png
>
>
> When using processing time window, in some workload, there will be a lot of 
> small sst files(serveral KB) in rocksdb local directory and may cause "Too 
> many files error".
> Use rocksdb tool ldb to find out content in sst files:
>  * column family of these small sst files is "processing_window-timers".
>  * most sst files are in level-1.
>  * records in sst files are almost kTypeDeletion.
>  * creation time of sst file correspond to checkpoint interval.
> These small sst files seem to be generated when flink checkpoint is 
> triggered. Although all content in sst are delete tags, they are not 
> compacted and deleted in rocksdb compaction because of not intersecting with 
> each other(rocksdb [compaction trivial 
> move|https://github.com/facebook/rocksdb/wiki/Compaction-Trivial-Move]). And 
> there seems to be no chance to delete them because of small size and not 
> intersect with other sst files.
>  
> I will attach a simple program to reproduce the problem.
>  
> Since timer in processing time window is generated in strictly ascending 
> order(both put and delete). So If workload of job happen to generate level-0 
> sst files not intersect with each other.(for example: processing window size 
> much smaller than checkpoint interval, and no window content cross checkpoint 
> interval or no new data in window crossing checkpoint interval). There will 
> be many small sst files generated until job restored from savepoint, or 
> incremental checkpoint is disabled. 
>  
> May be similar problem exists when user use timer in operators with same 
> workload.
>  
> Code to reproduce the problem:
> {code:java}
> package org.apache.flink.jira;
> import lombok.extern.slf4j.Slf4j;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.configuration.RestOptions;
> import org.apache.flink.configuration.TaskManagerOptions;
> import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> import 
> org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
> import 
> org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
> import org.apache.flink.streaming.api.windowing.time.Time;
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
> import org.apache.flink.util.Collector;
> import java.util.Collections;
> import java.util.List;
> import java.util.Random;
> @Slf4j
> public class StreamApp  {
>   public static void main(String[] args) throws Exception {
> Configuration config = new Configuration();
> config.set(RestOptions.ADDRESS, "127.0.0.1");
> config.set(RestOptions.PORT, 10086);
> config.set(TaskManagerOptions.NUM_TASK_SLOTS, 6);
> new 
> StreamApp().configureApp(StreamExecutionEnvironment.createLocalEnvironment(1, 
> config));
>   }
>   public void configureApp(StreamExecutionEnvironment env) throws Exception {
> env.enableCheckpointing(2); // 20sec
> RocksDBStateBackend rocksDBStateBackend =
> new 
> RocksDBStateBackend("file:///Users/shenjiaqi/Workspace/jira/flink-51/checkpoints/",
>  true); // need to be reconfigured
> 
> rocksDBStateBackend.setDbStoragePath("/Users/shenjiaqi/Workspace/jira/flink-51/flink/rocksdb_local_db");
>  // need to be reconfigured
> env.setStateBackend(rocksDBStateBackend);
> env.getCheckpointConfig().setCheckpointTimeout(10);
> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);
> env.setParallelism(1)

[jira] [Commented] (FLINK-26196) error when Incremental Checkpoints by RocksDb

2022-02-20 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17495296#comment-17495296
 ] 

Yun Tang commented on FLINK-26196:
--

[~hjw], apart from more logs. What kind of environment did you use? Docker 
image on ECS, or yarn container on linux machine?


> error when Incremental Checkpoints  by RocksDb 
> ---
>
> Key: FLINK-26196
> URL: https://issues.apache.org/jira/browse/FLINK-26196
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.13.2
>Reporter: hjw
>Priority: Critical
>
> When I use Incremental Checkpoints by RocksDb , errors happen occasionally. 
> Fortunately,Flink job is running normally
> Log:
> {code:java}
> java.io.IOException: Could not perform checkpoint 2804 for operator 
> cc-rule-keyByAndReduceStream (2/8)#1.
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1045)
>  at 
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:135)
>  at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:250)
>  at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:61)
>  at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:431)
>  at 
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61)
>  at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:227)
>  at 
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180)
>  at 
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)
>  at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
>  at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
>  at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not 
> complete snapshot 2804 for operator cc-rule-keyByAndReduceStream (2/8)#1. 
> Failure reason: Checkpoint was declined.
>  at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:264)
>  at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:169)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
>  at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:706)
>  at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:627)
>  at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:590)
>  at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:312)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1089)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1073)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1029)
>  ... 19 more
> Caused by: org.roc

[jira] [Commented] (FLINK-25528) state processor api do not support increment checkpoint

2022-02-18 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17494454#comment-17494454
 ] 

Yun Tang commented on FLINK-25528:
--

[~liufangqi] Thanks for you explaination. If current state processor API cannot 
write native checkpoint, I think we could make a full native checkpoint instead 
of a savepoint for state processor API writing. This deserves a patch of 
improvement. BTW, I think you can refer to 
https://issues.apache.org/jira/browse/FLINK-25276 to change your PR.

> state processor api do not support increment checkpoint
> ---
>
> Key: FLINK-25528
> URL: https://issues.apache.org/jira/browse/FLINK-25528
> Project: Flink
>  Issue Type: Improvement
>  Components: API / State Processor, Runtime / State Backends
>Reporter: 刘方奇
>Priority: Major
>
> As the title, in the state-processor-api, we use the savepoint opition to 
> snapshot state defaultly in org.apache.flink.state.api.output.SnapshotUtils.
> But in many cases, we maybe need to snapshot state incremently which have 
> better performance than savepoint.
> Shall we add the config to chose the checkpoint way just like flink stream 
> backend?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25528) state processor api do not support increment checkpoint

2022-02-17 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17494345#comment-17494345
 ] 

Yun Tang commented on FLINK-25528:
--

[~liufangqi] I don't understand the title. Actually, state processor could work 
with incremental checkpoint on reading and writing. Have you ever tried to load 
a incremental checkpoint (the loading checkpoint path should be {{path/chk-x}}) 
? 

> state processor api do not support increment checkpoint
> ---
>
> Key: FLINK-25528
> URL: https://issues.apache.org/jira/browse/FLINK-25528
> Project: Flink
>  Issue Type: Improvement
>  Components: API / State Processor, Runtime / State Backends
>Reporter: 刘方奇
>Priority: Major
>
> As the title, in the state-processor-api, we use the savepoint opition to 
> snapshot state defaultly in org.apache.flink.state.api.output.SnapshotUtils.
> But in many cases, we maybe need to snapshot state incremently which have 
> better performance than savepoint.
> Shall we add the config to chose the checkpoint way just like flink stream 
> backend?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-26170) checkpoint time too long

2022-02-15 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-26170.

Resolution: Information Provided

> checkpoint  time  too long 
> ---
>
> Key: FLINK-26170
> URL: https://issues.apache.org/jira/browse/FLINK-26170
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.14.3
>Reporter: zhangqw152
>Priority: Major
> Attachments: image-2022-02-16-09-46-42-675.png, 
> image-2022-02-16-09-54-48-482.png
>
>
> !image-2022-02-16-09-46-42-675.png!
> !image-2022-02-16-09-54-48-482.png!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26170) checkpoint time too long

2022-02-15 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17492972#comment-17492972
 ] 

Yun Tang commented on FLINK-26170:
--

[~zhangqw152], the checkpoint duration dependens on many possible reasons, 
maybe job is backpressured, some customized operator hangs at checkpoint sync 
duration. The information you provided cannot help anything.
Moreover, user mailing is a better place to ask why checkpoint expired. Since 
current ticket description cannot provided any useful information, I will close 
this ticket. And if we could identify that this is a really bug, we can then 
create a specific ticket target to that bug.

> checkpoint  time  too long 
> ---
>
> Key: FLINK-26170
> URL: https://issues.apache.org/jira/browse/FLINK-26170
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.14.3
>Reporter: zhangqw152
>Priority: Major
> Attachments: image-2022-02-16-09-46-42-675.png, 
> image-2022-02-16-09-54-48-482.png
>
>
> !image-2022-02-16-09-46-42-675.png!
> !image-2022-02-16-09-54-48-482.png!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-26170) checkpoint time too long

2022-02-15 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-26170:
-
Priority: Major  (was: Critical)

> checkpoint  time  too long 
> ---
>
> Key: FLINK-26170
> URL: https://issues.apache.org/jira/browse/FLINK-26170
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.14.3
>Reporter: zhangqw152
>Priority: Major
> Attachments: image-2022-02-16-09-46-42-675.png, 
> image-2022-02-16-09-54-48-482.png
>
>
> !image-2022-02-16-09-46-42-675.png!
> !image-2022-02-16-09-54-48-482.png!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (FLINK-26101) Avoid shared state registry to discard multi-registered identical changelog state

2022-02-14 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang resolved FLINK-26101.
--
Resolution: Fixed

Merged in master: 8b25ae64794727e49de66d0b61e0955da7f21961

> Avoid shared state registry to discard multi-registered identical changelog 
> state
> -
>
> Key: FLINK-26101
> URL: https://issues.apache.org/jira/browse/FLINK-26101
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Under change-log state backend, we will register same materialized keyed 
> state handle multi times, and {{SharedStateRegistryImpl}} will discard the 
> duplicated state handle.
> {code:java}
> if (!Objects.equals(state, entry.stateHandle)) {
> if (entry.confirmed || isPlaceholder(state)) {
> scheduledStateDeletion = state;
> } else {
> // Old entry is not in a confirmed checkpoint yet, and the new one 
> differs.
> // This might result from (omitted KG range here for simplicity):
> // 1. Flink recovers from a failure using a checkpoint 1
> // 2. State Backend is initialized to UID xyz and a set of SST: { 
> 01.sst }
> // 3. JM triggers checkpoint 2
> // 4. TM sends handle: "xyz-002.sst"; JM registers it under 
> "xyz-002.sst"
> // 5. TM crashes; everything is repeated from (2)
> // 6. TM recovers from CP 1 again: backend UID "xyz", SST { 01.sst }
> // 7. JM triggers checkpoint 3
> // 8. TM sends NEW state "xyz-002.sst"
> // 9. JM discards it as duplicate
> // 10. checkpoint completes, but a wrong SST file is used
> // So we use a new entry and discard the old one:
> scheduledStateDeletion = entry.stateHandle;
> entry.stateHandle = state;
> }
> {code}
> Thus, we need to implement the {{#equals}} method for the registered state 
> handles.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-26101) Avoid shared state registry to discard multi-registered identical changelog state

2022-02-13 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-26101:
-
Summary: Avoid shared state registry to discard multi-registered identical 
changelog state  (was: Avoid shared state registry to discard multi-registered 
changelog state)

> Avoid shared state registry to discard multi-registered identical changelog 
> state
> -
>
> Key: FLINK-26101
> URL: https://issues.apache.org/jira/browse/FLINK-26101
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
> Fix For: 1.15.0
>
>
> Under change-log state backend, we will register same materialized keyed 
> state handle multi times, and {{SharedStateRegistryImpl}} will discard the 
> duplicated state handle.
> {code:java}
> if (!Objects.equals(state, entry.stateHandle)) {
> if (entry.confirmed || isPlaceholder(state)) {
> scheduledStateDeletion = state;
> } else {
> // Old entry is not in a confirmed checkpoint yet, and the new one 
> differs.
> // This might result from (omitted KG range here for simplicity):
> // 1. Flink recovers from a failure using a checkpoint 1
> // 2. State Backend is initialized to UID xyz and a set of SST: { 
> 01.sst }
> // 3. JM triggers checkpoint 2
> // 4. TM sends handle: "xyz-002.sst"; JM registers it under 
> "xyz-002.sst"
> // 5. TM crashes; everything is repeated from (2)
> // 6. TM recovers from CP 1 again: backend UID "xyz", SST { 01.sst }
> // 7. JM triggers checkpoint 3
> // 8. TM sends NEW state "xyz-002.sst"
> // 9. JM discards it as duplicate
> // 10. checkpoint completes, but a wrong SST file is used
> // So we use a new entry and discard the old one:
> scheduledStateDeletion = entry.stateHandle;
> entry.stateHandle = state;
> }
> {code}
> Thus, we need to implement the {{#equals}} method for the registered state 
> handles.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-26101) Avoid shared state registry to discard multi-registered changelog state

2022-02-13 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-26101:
-
Summary: Avoid shared state registry to discard multi-registered changelog 
state  (was: Avoid shared state registry to discard duplicate changelog state)

> Avoid shared state registry to discard multi-registered changelog state
> ---
>
> Key: FLINK-26101
> URL: https://issues.apache.org/jira/browse/FLINK-26101
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
> Fix For: 1.15.0
>
>
> Under change-log state backend, we will register same materialized keyed 
> state handle multi times, and {{SharedStateRegistryImpl}} will discard the 
> duplicated state handle.
> {code:java}
> if (!Objects.equals(state, entry.stateHandle)) {
> if (entry.confirmed || isPlaceholder(state)) {
> scheduledStateDeletion = state;
> } else {
> // Old entry is not in a confirmed checkpoint yet, and the new one 
> differs.
> // This might result from (omitted KG range here for simplicity):
> // 1. Flink recovers from a failure using a checkpoint 1
> // 2. State Backend is initialized to UID xyz and a set of SST: { 
> 01.sst }
> // 3. JM triggers checkpoint 2
> // 4. TM sends handle: "xyz-002.sst"; JM registers it under 
> "xyz-002.sst"
> // 5. TM crashes; everything is repeated from (2)
> // 6. TM recovers from CP 1 again: backend UID "xyz", SST { 01.sst }
> // 7. JM triggers checkpoint 3
> // 8. TM sends NEW state "xyz-002.sst"
> // 9. JM discards it as duplicate
> // 10. checkpoint completes, but a wrong SST file is used
> // So we use a new entry and discard the old one:
> scheduledStateDeletion = entry.stateHandle;
> entry.stateHandle = state;
> }
> {code}
> Thus, we need to implement the {{#equals}} method for the registered state 
> handles.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26101) Avoid shared state registry to discard duplicate changelog state

2022-02-13 Thread Yun Tang (Jira)
Yun Tang created FLINK-26101:


 Summary: Avoid shared state registry to discard duplicate 
changelog state
 Key: FLINK-26101
 URL: https://issues.apache.org/jira/browse/FLINK-26101
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Reporter: Yun Tang
Assignee: Yun Tang
 Fix For: 1.15.0


Under change-log state backend, we will register same materialized keyed state 
handle multi times, and {{SharedStateRegistryImpl}} will discard the duplicated 
state handle.

{code:java}
if (!Objects.equals(state, entry.stateHandle)) {
if (entry.confirmed || isPlaceholder(state)) {
scheduledStateDeletion = state;
} else {
// Old entry is not in a confirmed checkpoint yet, and the new one 
differs.
// This might result from (omitted KG range here for simplicity):
// 1. Flink recovers from a failure using a checkpoint 1
// 2. State Backend is initialized to UID xyz and a set of SST: { 
01.sst }
// 3. JM triggers checkpoint 2
// 4. TM sends handle: "xyz-002.sst"; JM registers it under 
"xyz-002.sst"
// 5. TM crashes; everything is repeated from (2)
// 6. TM recovers from CP 1 again: backend UID "xyz", SST { 01.sst }
// 7. JM triggers checkpoint 3
// 8. TM sends NEW state "xyz-002.sst"
// 9. JM discards it as duplicate
// 10. checkpoint completes, but a wrong SST file is used
// So we use a new entry and discard the old one:
scheduledStateDeletion = entry.stateHandle;
entry.stateHandle = state;
}
{code}

Thus, we need to implement the {{#equals}} method for the registered state 
handles.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (FLINK-25466) TTL configuration could parse in StateTtlConfig#DISABLED

2022-02-13 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25466?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang resolved FLINK-25466.
--
Resolution: Fixed

merged 
master: c1c966568b50c03257428e018f3cacd88b7dd116
release-1.14: 30994a1788085034ed1b467a5df6253ee44b1da6
release-1.13: ab86ffa78a2952126d90a4d4fa3ce0b16b957991

> TTL configuration could parse in StateTtlConfig#DISABLED
> 
>
> Key: FLINK-25466
> URL: https://issues.apache.org/jira/browse/FLINK-25466
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.4, 1.13.7
>
>
> Current API \{{StateDescriptor#enableTimeToLive(StateTtlConfig)}} cannot 
> handle StateTtlConfig#DISABLED due to its current implementation.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (FLINK-25478) Same materialized state handle should not register multi times

2022-02-11 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang resolved FLINK-25478.
--
Resolution: Fixed

Merged in master: ae166080fd1458b6e761abcf18d9302a4dcefbcd and 
a519ed1b11e7bf085d3eee6c6f39cab5967269d0

> Same materialized state handle should not register multi times
> --
>
> Key: FLINK-25478
> URL: https://issues.apache.org/jira/browse/FLINK-25478
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Currently, changelog materialization would call RocksDB state backend's 
> snapshot method to generate {{IncrementalRemoteKeyedStateHandle}} as 
> ChangelogStateBackendHandleImpl's materialized artifacts. And before next 
> materialization, it will always report the same 
> {{IncrementalRemoteKeyedStateHandle}} as before.
> It's fine to register this for the 1st time. However, for the 2nd time to 
> register {{IncrementalRemoteKeyedStateHandle}} (via 
> {{{}ChangelogStateBackendHandleImpl#registerSharedStates{}}}), it will 
> discard the private state artifacts without check the register reference:
> IncrementalRemoteKeyedStateHandle:
> {code:java}
> public void discardState() throws Exception {
> try {
> StateUtil.bestEffortDiscardAllStateObjects(privateState.values());
> } catch (Exception e) {
> LOG.warn("Could not properly discard misc file states.", e);
> }
> }
> {code}
> Thus, this would delete the private state (such as RocksDB's MAINFEST), and 
> once restore, job would not report FileNotFoundException.
>  
> {code:java}
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught 
> unexpected exception.
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:403)
> at 
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:477)
>  
> at 
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:90)
>  
>     at 
> org.apache.flink.state.changelog.ChangelogStateBackend.lambda$createKeyedStateBackend$1(ChangelogStateBackend.java:153)
> at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:68)
>     at 
> org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:221)
>  
> at 
> org.apache.flink.state.changelog.ChangelogStateBackend.createKeyedStateBackend(ChangelogStateBackend.java:145)
>  
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
>  
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>  
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>  
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
>  
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
>  
> ... 10 more
> Caused by: java.io.FileNotFoundException: x
> at 
> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(PluginFileSystemFactory.java:127)
>  
> at 
> org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
>  
> at 
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:127)
>  
> at 
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:110)
>     at 
> org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49)
>  
> at 
> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
>  ~[?:1.8.0_102]
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)
>  ~[?:1.8.0_102]
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)
>  ~[?:1.8.0_102]
> ... 1 more {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25557) Introduce incremental/full checkpoint size stats

2022-02-11 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-25557:
-
Release Note: Introduce metrics of persistent bytes within each checkpoint 
(via REST API and UI), which could help users to know how much data size had 
been persisted during the incremental or change-log based checkpoint.

> Introduce incremental/full checkpoint size stats
> 
>
> Key: FLINK-25557
> URL: https://issues.apache.org/jira/browse/FLINK-25557
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / Metrics, Runtime / 
> REST, Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Currently, the "checkpointed data size" would be incremental checkpoint size 
> if incremental checkpoint is enabled, otherwise full checkpoint size. This is 
> not friendly for users to know the exact incremental/full checkpoint size.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26062) [Changelog] Non-deterministic recovery of PriorityQueue states

2022-02-09 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17489930#comment-17489930
 ] 

Yun Tang commented on FLINK-26062:
--

I think this is also true for InternalPriorityQueue#peek() operation as the 
order is non-deterministic on recovery. As peek is not a writing operation, 
changelog cannot record it. However, this might not affect the expected 
behaviors, right?

> [Changelog] Non-deterministic recovery of PriorityQueue states
> --
>
> Key: FLINK-26062
> URL: https://issues.apache.org/jira/browse/FLINK-26062
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.15.0
>Reporter: Roman Khachatryan
>Assignee: Roman Khachatryan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Currently, InternalPriorityQueue.poll() is logged as a separate operation, 
> without specifying the element that has been polled. On recovery, this 
> recorded poll() is replayed.
> However, this is not deterministic because the order of PQ elements with 
> equal priorityis not specified. For example, TimerHeapInternalTimer only 
> compares timestamps, which are often equal. This results in polling timers 
> from queue in wrong order => dropping timers => and not firing timers.
>  
> ProcessingTimeWindowCheckpointingITCase.testAggregatingSlidingProcessingTimeWindow
>  fails with materialization enabled and using heap state backend (both 
> in-memory and fs-based implementations).
>  
> Proposed solution is to replace poll with remove operation (which is based on 
> equality).
>  
> cc: [~masteryhx], [~ym], [~yunta]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25835) The task initialization duration is recorded in logs

2022-01-30 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17484354#comment-17484354
 ] 

Yun Tang commented on FLINK-25835:
--

Current state backend would print the time to restore the state handles, should 
that be enough?

> The task initialization duration is recorded in logs
> 
>
> Key: FLINK-25835
> URL: https://issues.apache.org/jira/browse/FLINK-25835
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.12.2, 1.15.0
>Reporter: Bo Cui
>Priority: Major
>  Labels: pull-request-available
>
> [https://github.com/apache/flink/blob/a543e658acfbc22c1579df0d043654037b9ec4b0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L644]
> We are testing the time of state backend initialization for different data 
> levels.However, the task initialization time cannot be obtained from the log 
> file and the time taken to restore the status at the backend cannot be 
> obtained.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25862) Refactor SharedStateRegistry to not limit StreamStateHandle to register/unregister

2022-01-30 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-25862:
-
Summary: Refactor SharedStateRegistry to not limit StreamStateHandle to 
register/unregister  (was: Refactor SharedStateRegistry to limit 
StreamStateHandle to register/unregister)

> Refactor SharedStateRegistry to not limit StreamStateHandle to 
> register/unregister
> --
>
> Key: FLINK-25862
> URL: https://issues.apache.org/jira/browse/FLINK-25862
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: Yun Tang
>Priority: Major
>
> Current implementation of SharedStateRegistry would use `StreamStateHandle` 
> to register and unregister. This would limit the usage for other componments, 
> such as change-log state backend handle usage.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (FLINK-25767) Translation of page 'Working with State' is incomplete

2022-01-28 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang resolved FLINK-25767.
--
Fix Version/s: 1.14.4
   Resolution: Fixed

Merged
master: a424a9c7336c2e2c9de8406121efb5c0bb7257ea
release-1.14: 517436abf607f2750e2c58b1fb659a86360ac43f

> Translation of page 'Working with State' is incomplete
> --
>
> Key: FLINK-25767
> URL: https://issues.apache.org/jira/browse/FLINK-25767
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.14.3
>Reporter: Yao Zhang
>Assignee: Yao Zhang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.4
>
>
> The translation of page [Working with State | Apache 
> Flink|https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/datastream/fault-tolerance/state/]
>  is incomplete.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25872) Restoring from incremental checkpoint with changelog state-backend cannot work well with snapshot CLAIM mode

2022-01-28 Thread Yun Tang (Jira)
Yun Tang created FLINK-25872:


 Summary: Restoring from incremental checkpoint with changelog 
state-backend cannot work well with snapshot CLAIM mode 
 Key: FLINK-25872
 URL: https://issues.apache.org/jira/browse/FLINK-25872
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing, Runtime / State Backends
Reporter: Yun Tang


If we restore from incremental checkpoint with changelog state-backend enabled 
in snapshot CLAIM mode, the restored checkpoint would be discarded on subsume, 
which lead to the private state in Incremental state handle could be deleted by 
mistake. This bug is like FLINK-25478.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25862) Refactor SharedStateRegistry to limit StreamStateHandle to register/unregister

2022-01-28 Thread Yun Tang (Jira)
Yun Tang created FLINK-25862:


 Summary: Refactor SharedStateRegistry to limit StreamStateHandle 
to register/unregister
 Key: FLINK-25862
 URL: https://issues.apache.org/jira/browse/FLINK-25862
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing, Runtime / State Backends
Reporter: Yun Tang


Current implementation of SharedStateRegistry would use `StreamStateHandle` to 
register and unregister. This would limit the usage for other componments, such 
as change-log state backend handle usage.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Reopened] (FLINK-25343) HBaseConnectorITCase.testTableSourceSinkWithDDL fail on azure

2022-01-26 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reopened FLINK-25343:
--

Reopen the issue as another instance happened:
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=30251&view=logs&j=d44f43ce-542c-597d-bf94-b0718c71e5e8&t=ed165f3f-d0f6-524b-5279-86f8ee7d0e2d


> HBaseConnectorITCase.testTableSourceSinkWithDDL fail on azure
> -
>
> Key: FLINK-25343
> URL: https://issues.apache.org/jira/browse/FLINK-25343
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HBase
>Reporter: Yun Gao
>Assignee: Jing Ge
>Priority: Minor
>  Labels: test-stability
>
> {code:java}
> Dec 15 16:54:20   at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> Dec 15 16:54:20   at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> Dec 15 16:54:20   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> Dec 15 16:54:20   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> Dec 15 16:54:20   at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
> Dec 15 16:54:20   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> Dec 15 16:54:20   at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> Dec 15 16:54:20   at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:413)
> Dec 15 16:54:20   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:367)
> Dec 15 16:54:20   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:274)
> Dec 15 16:54:20   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
> Dec 15 16:54:20   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:161)
> Dec 15 16:54:20   at 
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:290)
> Dec 15 16:54:20   at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:242)
> Dec 15 16:54:20   at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:121)
> Dec 15 16:54:20 
> Dec 15 16:54:21 
> Dec 15 16:54:21 Results :
> Dec 15 16:54:21 
> Dec 15 16:54:21 Failed tests: 
> Dec 15 16:54:21   HBaseConnectorITCase.testTableSourceSinkWithDDL:371 
> expected:<8> but was:<3>
> Dec 15 16:54:21 
> Dec 15 16:54:21 Tests run: 9, Failures: 1, Errors: 0, Skipped: 0
> Dec 15 16:54:21 
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=28204&view=logs&j=d44f43ce-542c-597d-bf94-b0718c71e5e8&t=ed165f3f-d0f6-524b-5279-86f8ee7d0e2d&l=12375



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (FLINK-25557) Introduce incremental/full checkpoint size stats

2022-01-26 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang resolved FLINK-25557.
--
Fix Version/s: 1.15.0
   Resolution: Fixed

merged in master:
d34653c2fbdbf962ec5db07cadde2bc6db14b24f
b2968c76a611d9e8479ef9acff8079f99b34ab12
ca519f68fa9e1db56daee9435be5e30530da1e70

> Introduce incremental/full checkpoint size stats
> 
>
> Key: FLINK-25557
> URL: https://issues.apache.org/jira/browse/FLINK-25557
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / Metrics, Runtime / 
> REST, Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Currently, the "checkpointed data size" would be incremental checkpoint size 
> if incremental checkpoint is enabled, otherwise full checkpoint size. This is 
> not friendly for users to know the exact incremental/full checkpoint size.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25816) Changelog keyed state backend would come across NPE during notification

2022-01-25 Thread Yun Tang (Jira)
Yun Tang created FLINK-25816:


 Summary: Changelog keyed state backend would come across NPE 
during notification
 Key: FLINK-25816
 URL: https://issues.apache.org/jira/browse/FLINK-25816
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing, Runtime / State Backends
Reporter: Yun Tang
Assignee: Yun Tang
 Fix For: 1.15.0


Instance: 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=30158&view=logs&j=a57e0635-3fad-5b08-57c7-a4142d7d6fa9&t=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7

{code:java}
Caused by: java.lang.NullPointerException
at 
org.apache.flink.state.changelog.ChangelogKeyedStateBackend.notifyCheckpointAborted(ChangelogKeyedStateBackend.java:536)
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.notifyCheckpointAborted(StreamOperatorStateHandler.java:298)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.notifyCheckpointAborted(AbstractStreamOperator.java:383)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointAborted(AbstractUdfStreamOperator.java:132)
at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointAborted(RegularOperatorChain.java:158)
at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:406)
at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointAborted(SubtaskCheckpointCoordinatorImpl.java:352)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointAbortAsync$15(StreamTask.java:1327)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1350)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:802)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:751)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:748)

{code}




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-15507) Activate local recovery for RocksDB backends by default

2022-01-24 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17481203#comment-17481203
 ] 

Yun Tang commented on FLINK-15507:
--

[~pnowojski] TaskStateManagerImpl would clean up local checkpoint on notifying 
the aborted checkpoint (see 
https://github.com/apache/flink/blob/42556522637bf7b6f54809afec08fa0899f3e8fd/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java#L239).

> Activate local recovery for RocksDB backends by default
> ---
>
> Key: FLINK-15507
> URL: https://issues.apache.org/jira/browse/FLINK-15507
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Stephan Ewen
>Assignee: Yuan Mei
>Priority: Major
>  Labels: auto-deprioritized-critical, auto-unassigned, 
> pull-request-available
>
> For the RocksDB state backend, local recovery has no overhead when 
> incremental checkpoints are used. 
> It should be activated by default, because it greatly helps with recovery.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25200) Implement duplicating for s3 filesystem

2022-01-24 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17481048#comment-17481048
 ] 

Yun Tang commented on FLINK-25200:
--

[~pnowojski] Actually, I mean we should implement Aliyun OSS duplication first 
since OSS could achieve much better performance improvement. These two tickets 
are certainlly independent.

> Implement duplicating for s3 filesystem
> ---
>
> Key: FLINK-25200
> URL: https://issues.apache.org/jira/browse/FLINK-25200
> Project: Flink
>  Issue Type: Sub-task
>  Components: FileSystems
>Reporter: Dawid Wysakowicz
>Priority: Major
> Fix For: 1.15.0
>
>
> We can use https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25200) Implement duplicating for s3 filesystem

2022-01-24 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17480904#comment-17480904
 ] 

Yun Tang commented on FLINK-25200:
--

[~pnowojski] From my knowledge, S3 will involve copy on server side if calling 
#copyObject, which cannot be really fast. And OSS would use [shallow 
copy|https://issues.apache.org/jira/browse/HADOOP-15323] for normal foramt file 
under 32MB or multipat format file, which could finish the copying within 
hundreds of milliseconds (can achive 10 millisecond at least), which deserves 
to be implemented first.

> Implement duplicating for s3 filesystem
> ---
>
> Key: FLINK-25200
> URL: https://issues.apache.org/jira/browse/FLINK-25200
> Project: Flink
>  Issue Type: Sub-task
>  Components: FileSystems
>Reporter: Dawid Wysakowicz
>Priority: Major
> Fix For: 1.15.0
>
>
> We can use https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-22578) deployment configuration page miss content navigate menu

2022-01-23 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-22578.

Fix Version/s: (was: 1.15.0)
   (was: 1.13.6)
   Resolution: Implemented

> deployment configuration page miss content navigate menu
> 
>
> Key: FLINK-22578
> URL: https://issues.apache.org/jira/browse/FLINK-22578
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.13.0
>Reporter: 谢波
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> [https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/]
> The page miss content navigate menu cuase Inconvenient to use.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Reopened] (FLINK-22578) deployment configuration page miss content navigate menu

2022-01-23 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reopened FLINK-22578:
--

> deployment configuration page miss content navigate menu
> 
>
> Key: FLINK-22578
> URL: https://issues.apache.org/jira/browse/FLINK-22578
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.13.0
>Reporter: 谢波
>Priority: Minor
>  Labels: auto-deprioritized-major
> Fix For: 1.15.0, 1.13.6
>
>
> [https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/]
> The page miss content navigate menu cuase Inconvenient to use.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25767) Translation of page 'Working with State' is incomplete

2022-01-23 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17480830#comment-17480830
 ] 

Yun Tang commented on FLINK-25767:
--

[~paul8263] already assigned to you, please go ahead.

> Translation of page 'Working with State' is incomplete
> --
>
> Key: FLINK-25767
> URL: https://issues.apache.org/jira/browse/FLINK-25767
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.14.3
>Reporter: Yao Zhang
>Assignee: Yao Zhang
>Priority: Major
> Fix For: 1.15.0
>
>
> The translation of page [Working with State | Apache 
> Flink|https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/datastream/fault-tolerance/state/]
>  is incomplete.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-25767) Translation of page 'Working with State' is incomplete

2022-01-23 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-25767:


Assignee: Yao Zhang

> Translation of page 'Working with State' is incomplete
> --
>
> Key: FLINK-25767
> URL: https://issues.apache.org/jira/browse/FLINK-25767
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.14.3
>Reporter: Yao Zhang
>Assignee: Yao Zhang
>Priority: Major
> Fix For: 1.15.0
>
>
> The translation of page [Working with State | Apache 
> Flink|https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/datastream/fault-tolerance/state/]
>  is incomplete.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-15507) Activate local recovery for RocksDB backends by default

2022-01-20 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-15507:


Assignee: Yuan Mei  (was: Yuan Mei)

> Activate local recovery for RocksDB backends by default
> ---
>
> Key: FLINK-15507
> URL: https://issues.apache.org/jira/browse/FLINK-15507
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Stephan Ewen
>Assignee: Yuan Mei
>Priority: Major
>  Labels: auto-deprioritized-critical, auto-unassigned, 
> pull-request-available
>
> For the RocksDB state backend, local recovery has no overhead when 
> incremental checkpoints are used. 
> It should be activated by default, because it greatly helps with recovery.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-15507) Activate local recovery for RocksDB backends by default

2022-01-20 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-15507:


Assignee: Yuan Mei

> Activate local recovery for RocksDB backends by default
> ---
>
> Key: FLINK-15507
> URL: https://issues.apache.org/jira/browse/FLINK-15507
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Stephan Ewen
>Assignee: Yuan Mei
>Priority: Major
>  Labels: auto-deprioritized-critical, auto-unassigned, 
> pull-request-available
>
> For the RocksDB state backend, local recovery has no overhead when 
> incremental checkpoints are used. 
> It should be activated by default, because it greatly helps with recovery.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-15507) Activate local recovery for RocksDB backends by default

2022-01-20 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-15507:
-
Fix Version/s: (was: 1.15.0)

> Activate local recovery for RocksDB backends by default
> ---
>
> Key: FLINK-15507
> URL: https://issues.apache.org/jira/browse/FLINK-15507
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Stephan Ewen
>Priority: Major
>  Labels: auto-deprioritized-critical, auto-unassigned, 
> pull-request-available
>
> For the RocksDB state backend, local recovery has no overhead when 
> incremental checkpoints are used. 
> It should be activated by default, because it greatly helps with recovery.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25200) Implement duplicating for s3 filesystem

2022-01-19 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17479139#comment-17479139
 ] 

Yun Tang commented on FLINK-25200:
--

[~akalashnikov] I think we should run tests to mock the condition how Flink 
will use DFS for checkpoint. That's why I think we should read local existng 
files and upload them to S3. And certainlly, we should test it on AWS mcahines 
instead of on local machine.

> Implement duplicating for s3 filesystem
> ---
>
> Key: FLINK-25200
> URL: https://issues.apache.org/jira/browse/FLINK-25200
> Project: Flink
>  Issue Type: Sub-task
>  Components: FileSystems
>Reporter: Dawid Wysakowicz
>Priority: Major
> Fix For: 1.15.0
>
>
> We can use https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-24210) Window related serailizer should not return 0 as its serialized length

2022-01-19 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-24210:
-
Fix Version/s: 1.15.0

> Window related serailizer should not return 0 as its serialized length
> --
>
> Key: FLINK-24210
> URL: https://issues.apache.org/jira/browse/FLINK-24210
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System, Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Jinzhong Li
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.15.0
>
>
> TimeWindow serializer return 0 as its length for serialization, this is 
> certatinately not correct.
> {code:java}
> public static class Serializer extends TypeSerializerSingleton {
> 
> @Override
> public int getLength() {
> return 0;
> }
> }
> {code}
> Current namespace serializer in state backend does not depends on this 
> interface so that no obvious bug has ever reported.
> Moreover, this bug also occurs in other window related serializer.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (FLINK-24210) Window related serailizer should not return 0 as its serialized length

2022-01-19 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang resolved FLINK-24210.
--
Resolution: Fixed

Merged in master: 573d2b038f8fd5dc1177733ee3087c5b6c847fa4

> Window related serailizer should not return 0 as its serialized length
> --
>
> Key: FLINK-24210
> URL: https://issues.apache.org/jira/browse/FLINK-24210
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System, Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Jinzhong Li
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>
> TimeWindow serializer return 0 as its length for serialization, this is 
> certatinately not correct.
> {code:java}
> public static class Serializer extends TypeSerializerSingleton {
> 
> @Override
> public int getLength() {
> return 0;
> }
> }
> {code}
> Current namespace serializer in state backend does not depends on this 
> interface so that no obvious bug has ever reported.
> Moreover, this bug also occurs in other window related serializer.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25200) Implement duplicating for s3 filesystem

2022-01-19 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17478674#comment-17478674
 ] 

Yun Tang commented on FLINK-25200:
--

For general RocksDB incremental checkpoint, we should also test uploading 64MB 
files V.S copying 64MB file as RocksDB has the default 64MB SST files.

Moreoever, I noticed that the write throughput can be reached to 100MB/s, from 
my understanding, this behavior looks really good on cloud environment. Do you 
ensure that the test logic is reading local files and then write to remote S3 
instead of directly write to remote S3?

> Implement duplicating for s3 filesystem
> ---
>
> Key: FLINK-25200
> URL: https://issues.apache.org/jira/browse/FLINK-25200
> Project: Flink
>  Issue Type: Sub-task
>  Components: FileSystems
>Reporter: Dawid Wysakowicz
>Priority: Major
> Fix For: 1.15.0
>
>
> We can use https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-25494) Duplicate element serializer during DefaultOperatorStateBackendSnapshotStrategy#syncPrepareResources

2022-01-18 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-25494.

Resolution: Invalid

> Duplicate element serializer during 
> DefaultOperatorStateBackendSnapshotStrategy#syncPrepareResources
> 
>
> Key: FLINK-25494
> URL: https://issues.apache.org/jira/browse/FLINK-25494
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.13.6, 1.14.4
>
>
> Currently, during 
> DefaultOperatorStateBackendSnapshotStrategy#syncPrepareResources, it will 
> copy the array list serializer via PartitionableListState#deepCopy. However, 
> it just initialize another ArrayListSerializer and not duplicate the internal 
> state serializer:
>  
> See "{{{}internalListCopySerializer{}}}":
>  
> {code:java}
> private PartitionableListState(
> RegisteredOperatorStateBackendMetaInfo stateMetaInfo, ArrayList 
> internalList) {
> this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo);
> this.internalList = Preconditions.checkNotNull(internalList);
> this.internalListCopySerializer =
> new 
> ArrayListSerializer<>(stateMetaInfo.getPartitionStateSerializer());
> } {code}
>  
> This would cause unexpected problem with the usage of kryo serializer.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-25580) Minor improvements for RocksDBIncrementalCheckpointUtils

2022-01-16 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-25580:


Assignee: Aitozi

> Minor improvements for RocksDBIncrementalCheckpointUtils
> 
>
> Key: FLINK-25580
> URL: https://issues.apache.org/jira/browse/FLINK-25580
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Aitozi
>Assignee: Aitozi
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> {{RocksDBIncrementalCheckpointUtils}} use 
> {{intersectGroup.getNumberOfKeyGroups() * overlapFraction * overlapFraction}} 
> to evaluate the score of the state handle. It's meaning is not so explicit. 
> And the rule to choose the handle should be 
> 1. Choose the bigger intersect group range first
> 2. If the intersect group range are same, we use the handle with higher 
> overlap fraction, which means less iterator delete operation.
> After we introduce the {{deleteRange}} api, this rule can still work. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25580) Minor improvements for RocksDBIncrementalCheckpointUtils

2022-01-16 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-25580:
-
Fix Version/s: 1.15.0

> Minor improvements for RocksDBIncrementalCheckpointUtils
> 
>
> Key: FLINK-25580
> URL: https://issues.apache.org/jira/browse/FLINK-25580
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Aitozi
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> {{RocksDBIncrementalCheckpointUtils}} use 
> {{intersectGroup.getNumberOfKeyGroups() * overlapFraction * overlapFraction}} 
> to evaluate the score of the state handle. It's meaning is not so explicit. 
> And the rule to choose the handle should be 
> 1. Choose the bigger intersect group range first
> 2. If the intersect group range are same, we use the handle with higher 
> overlap fraction, which means less iterator delete operation.
> After we introduce the {{deleteRange}} api, this rule can still work. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (FLINK-25580) Minor improvements for RocksDBIncrementalCheckpointUtils

2022-01-16 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang resolved FLINK-25580.
--
Resolution: Fixed

Megred in master: b7fd63b41500dbfeadbf2ee48e1a83d77cd1259b

> Minor improvements for RocksDBIncrementalCheckpointUtils
> 
>
> Key: FLINK-25580
> URL: https://issues.apache.org/jira/browse/FLINK-25580
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Aitozi
>Priority: Minor
>  Labels: pull-request-available
>
> {{RocksDBIncrementalCheckpointUtils}} use 
> {{intersectGroup.getNumberOfKeyGroups() * overlapFraction * overlapFraction}} 
> to evaluate the score of the state handle. It's meaning is not so explicit. 
> And the rule to choose the handle should be 
> 1. Choose the bigger intersect group range first
> 2. If the intersect group range are same, we use the handle with higher 
> overlap fraction, which means less iterator delete operation.
> After we introduce the {{deleteRange}} api, this rule can still work. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-24797) GlobalWindow.Serializer#getLength() provides a wrong length.

2022-01-14 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-24797.

Resolution: Information Provided

[~bx123] Thanks for your effect to finding this problem. However, this problem 
is known and duplicated with FLINK-24210.

> GlobalWindow.Serializer#getLength() provides a wrong length.
> 
>
> Key: FLINK-24797
> URL: https://issues.apache.org/jira/browse/FLINK-24797
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.12.0, 1.13.0, 1.14.0
>Reporter: bx123
>Priority: Minor
>
> The returned value in current version is 0, while the
> serialize() method indeed writes one byte and the correct value seems to be 1.
>  
> I am wondering whether there may exist any Serializer that will return length 
> 0?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-23378) ContinuousProcessingTimeTrigger最后一个定时器无法触发

2022-01-12 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-23378:
-
Fix Version/s: (was: 1.14.3)

> ContinuousProcessingTimeTrigger最后一个定时器无法触发
> --
>
> Key: FLINK-23378
> URL: https://issues.apache.org/jira/browse/FLINK-23378
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.12.3
>Reporter: frey
>Priority: Major
>  Labels: pull-request-available
>
> 使用滚动窗口,时间语义为ProcessingTime时,修改默认触发器为ContinuousProcessingTimeTrigger后,最后一个定时器时间等于下一个窗口的起始时间,所以无法触发最后一个定时器的计算
> 可修改onProcessingTime中time=window.maxTimestamp()时FIRE



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-23378) ContinuousProcessingTimeTrigger最后一个定时器无法触发

2022-01-12 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17474365#comment-17474365
 ] 

Yun Tang commented on FLINK-23378:
--

[~frey] Please make the title and description of this ticket is in English.

I see the PR has not been merged, why this ticket has been closed as "Fixed"? I 
will reopen this ticket and remove the fix version field.

> ContinuousProcessingTimeTrigger最后一个定时器无法触发
> --
>
> Key: FLINK-23378
> URL: https://issues.apache.org/jira/browse/FLINK-23378
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.12.3
>Reporter: frey
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.3
>
>
> 使用滚动窗口,时间语义为ProcessingTime时,修改默认触发器为ContinuousProcessingTimeTrigger后,最后一个定时器时间等于下一个窗口的起始时间,所以无法触发最后一个定时器的计算
> 可修改onProcessingTime中time=window.maxTimestamp()时FIRE



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Reopened] (FLINK-23378) ContinuousProcessingTimeTrigger最后一个定时器无法触发

2022-01-12 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reopened FLINK-23378:
--

> ContinuousProcessingTimeTrigger最后一个定时器无法触发
> --
>
> Key: FLINK-23378
> URL: https://issues.apache.org/jira/browse/FLINK-23378
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.12.3
>Reporter: frey
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.3
>
>
> 使用滚动窗口,时间语义为ProcessingTime时,修改默认触发器为ContinuousProcessingTimeTrigger后,最后一个定时器时间等于下一个窗口的起始时间,所以无法触发最后一个定时器的计算
> 可修改onProcessingTime中time=window.maxTimestamp()时FIRE



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (FLINK-25601) Update 'state.backend' in flink-conf.yaml

2022-01-11 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang resolved FLINK-25601.
--
Resolution: Fixed

merged in master: 41e96d2488e4caba17eef5172bb5ae493eb9c742

> Update 'state.backend' in flink-conf.yaml
> -
>
> Key: FLINK-25601
> URL: https://issues.apache.org/jira/browse/FLINK-25601
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.14.2
>Reporter: Ada Wong
>Assignee: Ada Wong
>Priority: Major
>  Labels: pull-request-available
>
> The value and comments of 'state.backend' in flink-conf.yaml is deprecated.
> {code:java}
> # Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
> # .
> #
> # state.backend: filesystem{code}
> We should update to this following.
>  
> {code:java}
> # Supported backends are 'hashmap', 'rocksdb', or the
> # .
> #
> # state.backend: hashmap {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-25478) Same materialized state handle should not register multi times

2022-01-11 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-25478:


Assignee: Yun Tang

> Same materialized state handle should not register multi times
> --
>
> Key: FLINK-25478
> URL: https://issues.apache.org/jira/browse/FLINK-25478
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Critical
> Fix For: 1.15.0
>
>
> Currently, changelog materialization would call RocksDB state backend's 
> snapshot method to generate {{IncrementalRemoteKeyedStateHandle}} as 
> ChangelogStateBackendHandleImpl's materialized artifacts. And before next 
> materialization, it will always report the same 
> {{IncrementalRemoteKeyedStateHandle}} as before.
> It's fine to register this for the 1st time. However, for the 2nd time to 
> register {{IncrementalRemoteKeyedStateHandle}} (via 
> {{{}ChangelogStateBackendHandleImpl#registerSharedStates{}}}), it will 
> discard the private state artifacts without check the register reference:
> IncrementalRemoteKeyedStateHandle:
> {code:java}
> public void discardState() throws Exception {
> try {
> StateUtil.bestEffortDiscardAllStateObjects(privateState.values());
> } catch (Exception e) {
> LOG.warn("Could not properly discard misc file states.", e);
> }
> }
> {code}
> Thus, this would delete the private state (such as RocksDB's MAINFEST), and 
> once restore, job would not report FileNotFoundException.
>  
> {code:java}
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught 
> unexpected exception.
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:403)
> at 
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:477)
>  
> at 
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:90)
>  
>     at 
> org.apache.flink.state.changelog.ChangelogStateBackend.lambda$createKeyedStateBackend$1(ChangelogStateBackend.java:153)
> at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:68)
>     at 
> org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:221)
>  
> at 
> org.apache.flink.state.changelog.ChangelogStateBackend.createKeyedStateBackend(ChangelogStateBackend.java:145)
>  
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
>  
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>  
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>  
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
>  
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
>  
> ... 10 more
> Caused by: java.io.FileNotFoundException: x
> at 
> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(PluginFileSystemFactory.java:127)
>  
> at 
> org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
>  
> at 
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:127)
>  
> at 
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:110)
>     at 
> org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49)
>  
> at 
> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
>  ~[?:1.8.0_102]
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)
>  ~[?:1.8.0_102]
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)
>  ~[?:1.8.0_102]
> ... 1 more {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25601) Update 'state.backend' in flink-conf.yaml

2022-01-10 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17472503#comment-17472503
 ] 

Yun Tang commented on FLINK-25601:
--

[~ana4] Already assigned to you, please go ahead.

> Update 'state.backend' in flink-conf.yaml
> -
>
> Key: FLINK-25601
> URL: https://issues.apache.org/jira/browse/FLINK-25601
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.14.2
>Reporter: Ada Wong
>Assignee: Ada Wong
>Priority: Major
>
> The value and comments of 'state.backend' in flink-conf.yaml is deprecated.
> {code:java}
> # Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
> # .
> #
> # state.backend: filesystem{code}
> We should update to this following.
>  
> {code:java}
> # Supported backends are 'hashmap', 'rocksdb', or the
> # .
> #
> # state.backend: hashmap {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-25601) Update 'state.backend' in flink-conf.yaml

2022-01-10 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-25601:


Assignee: Ada Wong

> Update 'state.backend' in flink-conf.yaml
> -
>
> Key: FLINK-25601
> URL: https://issues.apache.org/jira/browse/FLINK-25601
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.14.2
>Reporter: Ada Wong
>Assignee: Ada Wong
>Priority: Major
>
> The value and comments of 'state.backend' in flink-conf.yaml is deprecated.
> {code:java}
> # Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
> # .
> #
> # state.backend: filesystem{code}
> We should update to this following.
>  
> {code:java}
> # Supported backends are 'hashmap', 'rocksdb', or the
> # .
> #
> # state.backend: hashmap {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25557) Introduce incremental/full checkpoint size stats

2022-01-09 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17471713#comment-17471713
 ] 

Yun Tang commented on FLINK-25557:
--

[~liufangqi] Thanks for your attention. I will introduce the new attributes of 
"incremental checkpoint size" and "full checkpoint size" to replace original 
"checkpointed state size" to show on web UI and in rest APIs. This part of code 
had been done in my private flink and I think you could help to review the PR 
to enhance the community together.

> Introduce incremental/full checkpoint size stats
> 
>
> Key: FLINK-25557
> URL: https://issues.apache.org/jira/browse/FLINK-25557
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / Metrics, Runtime / 
> REST, Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
>
> Currently, the "checkpointed data size" would be incremental checkpoint size 
> if incremental checkpoint is enabled, otherwise full checkpoint size. This is 
> not friendly for users to know the exact incremental/full checkpoint size.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (FLINK-25536) Minor Fix: Adjust the order of variable declaration and comment in StateAssignmentOperation

2022-01-07 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang resolved FLINK-25536.
--
Fix Version/s: 1.15.0
   Resolution: Fixed

merged in master: 34720dc4c883b70114668fdcb9d7d1190edfbbf9

> Minor Fix: Adjust the order of variable declaration and comment in 
> StateAssignmentOperation
> ---
>
> Key: FLINK-25536
> URL: https://issues.apache.org/jira/browse/FLINK-25536
> Project: Flink
>  Issue Type: Improvement
>Reporter: Junfan Zhang
>Assignee: Junfan Zhang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25360) Add State Desc to CheckpointMetadata

2022-01-06 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17470355#comment-17470355
 ] 

Yun Tang commented on FLINK-25360:
--

[~liufangqi] I suggest to create a design doc via google doc first. And after 
several rounds of review, let's create a FLIP and launch a discussion then.

> Add State Desc to CheckpointMetadata
> 
>
> Key: FLINK-25360
> URL: https://issues.apache.org/jira/browse/FLINK-25360
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: 刘方奇
>Priority: Major
> Attachments: image-2021-12-17-20-01-42-423.png
>
>
> Now we can't get the State Descriptor info in the checkpoint meta. Like the 
> case if we use state-processor-api to load state then rewrite state, we can't 
> flexible use the state. 
> Maybe there are other cases we need the State Descriptor, so can we add this 
> info?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25557) Introduce incremental/full checkpoint size stats

2022-01-06 Thread Yun Tang (Jira)
Yun Tang created FLINK-25557:


 Summary: Introduce incremental/full checkpoint size stats
 Key: FLINK-25557
 URL: https://issues.apache.org/jira/browse/FLINK-25557
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing, Runtime / Metrics, Runtime / 
REST, Runtime / State Backends
Reporter: Yun Tang
Assignee: Yun Tang


Currently, the "checkpointed data size" would be incremental checkpoint size if 
incremental checkpoint is enabled, otherwise full checkpoint size. This is not 
friendly for users to know the exact incremental/full checkpoint size.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-25536) Minor Fix: Adjust the order of variable declaration and comment in StateAssignmentOperation

2022-01-05 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-25536:


Assignee: Junfan Zhang

> Minor Fix: Adjust the order of variable declaration and comment in 
> StateAssignmentOperation
> ---
>
> Key: FLINK-25536
> URL: https://issues.apache.org/jira/browse/FLINK-25536
> Project: Flink
>  Issue Type: Improvement
>Reporter: Junfan Zhang
>Assignee: Junfan Zhang
>Priority: Minor
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25536) Minor Fix: Adjust the order of variable declaration and comment in StateAssignmentOperation

2022-01-05 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17469694#comment-17469694
 ] 

Yun Tang commented on FLINK-25536:
--

[~zuston] I think the PR is easy for review. Please remember to fill the 
description.

> Minor Fix: Adjust the order of variable declaration and comment in 
> StateAssignmentOperation
> ---
>
> Key: FLINK-25536
> URL: https://issues.apache.org/jira/browse/FLINK-25536
> Project: Flink
>  Issue Type: Improvement
>Reporter: Junfan Zhang
>Priority: Minor
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25360) Add State Desc to CheckpointMetadata

2022-01-05 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17469693#comment-17469693
 ] 

Yun Tang commented on FLINK-25360:
--

[~liufangqi] I think we could at least to write a discuss email for persisting 
addition information in state descriptor. For the 1st problem, I think you 
could at least provide a proposal for how to persisting the state meta data in 
checkpoint meta. From my mind, this could require to add some additional 
interfaces.

> Add State Desc to CheckpointMetadata
> 
>
> Key: FLINK-25360
> URL: https://issues.apache.org/jira/browse/FLINK-25360
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: 刘方奇
>Priority: Major
> Attachments: image-2021-12-17-20-01-42-423.png
>
>
> Now we can't get the State Descriptor info in the checkpoint meta. Like the 
> case if we use state-processor-api to load state then rewrite state, we can't 
> flexible use the state. 
> Maybe there are other cases we need the State Descriptor, so can we add this 
> info?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25458) Support local recovery

2022-01-05 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25458?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-25458:
-
Description: 
Current changelog state-backend implementation cannot work well with local 
recovery enabled.

Current period materialization would call state backend snapshot method with a 
materialization id. However, current local state managment would rely on 
checkpoint id as storing, confirming and discarding. The gap between them would 
break how local recovery works.

  was:Current changelog state-backend implementation cannot work well with 
local recovery enabled.


> Support local recovery
> --
>
> Key: FLINK-25458
> URL: https://issues.apache.org/jira/browse/FLINK-25458
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: Yun Tang
>Priority: Major
>
> Current changelog state-backend implementation cannot work well with local 
> recovery enabled.
> Current period materialization would call state backend snapshot method with 
> a materialization id. However, current local state managment would rely on 
> checkpoint id as storing, confirming and discarding. The gap between them 
> would break how local recovery works.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25524) If enabled changelog, RocksDB incremental checkpoint would always be full

2022-01-05 Thread Yun Tang (Jira)
Yun Tang created FLINK-25524:


 Summary: If enabled changelog, RocksDB incremental checkpoint 
would always be full
 Key: FLINK-25524
 URL: https://issues.apache.org/jira/browse/FLINK-25524
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing, Runtime / State Backends
Reporter: Yun Tang


Once changelog is enabled, RocksDB incremental checkpoint would only be 
executed during materialization. During this phase, it will leverage the 
{{materization id}} as the checkpoint id for RocksDB state backend's snapshot 
method.

However, current incremental checkpoint mechanism heavily depends on the 
checkpoint id. And {{SortedMap> uploadedStateIDs}} 
with checkpoint id as the key within {{RocksIncrementalSnapshotStrategy}} is 
the kernel for incremental checkpoint. Once we notify checkpoint complete of 
previous checkpoint, it will then remove the uploaded stateIds of that 
checkpoint, leading to we cannot get proper checkpoint information on the next 
RocksDBKeyedStateBackend#snapshot. That is to say, we will always upload all 
RocksDB artifacts.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25478) Same materialized state handle should not register multi times

2022-01-04 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17468426#comment-17468426
 ] 

Yun Tang commented on FLINK-25478:
--

I created a branch to reproce this problem 
[https://github.com/Myasuka/flink/tree/FLINK-25478-test] , you can use 
{{ResumeCheckpointManuallyITCase.java}} to reproduce this.

> Same materialized state handle should not register multi times
> --
>
> Key: FLINK-25478
> URL: https://issues.apache.org/jira/browse/FLINK-25478
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: Yun Tang
>Priority: Critical
> Fix For: 1.15.0
>
>
> Currently, changelog materialization would call RocksDB state backend's 
> snapshot method to generate {{IncrementalRemoteKeyedStateHandle}} as 
> ChangelogStateBackendHandleImpl's materialized artifacts. And before next 
> materialization, it will always report the same 
> {{IncrementalRemoteKeyedStateHandle}} as before.
> It's fine to register this for the 1st time. However, for the 2nd time to 
> register {{IncrementalRemoteKeyedStateHandle}} (via 
> {{{}ChangelogStateBackendHandleImpl#registerSharedStates{}}}), it will 
> discard the private state artifacts without check the register reference:
> IncrementalRemoteKeyedStateHandle:
> {code:java}
> public void discardState() throws Exception {
> try {
> StateUtil.bestEffortDiscardAllStateObjects(privateState.values());
> } catch (Exception e) {
> LOG.warn("Could not properly discard misc file states.", e);
> }
> }
> {code}
> Thus, this would delete the private state (such as RocksDB's MAINFEST), and 
> once restore, job would not report FileNotFoundException.
>  
> {code:java}
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught 
> unexpected exception.
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:403)
> at 
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:477)
>  
> at 
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:90)
>  
>     at 
> org.apache.flink.state.changelog.ChangelogStateBackend.lambda$createKeyedStateBackend$1(ChangelogStateBackend.java:153)
> at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:68)
>     at 
> org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:221)
>  
> at 
> org.apache.flink.state.changelog.ChangelogStateBackend.createKeyedStateBackend(ChangelogStateBackend.java:145)
>  
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
>  
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>  
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>  
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
>  
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
>  
> ... 10 more
> Caused by: java.io.FileNotFoundException: x
> at 
> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(PluginFileSystemFactory.java:127)
>  
> at 
> org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
>  
> at 
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:127)
>  
> at 
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:110)
>     at 
> org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49)
>  
> at 
> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
>  ~[?:1.8.0_102]
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)
>  ~[?:1.8.0_102]
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)
>  ~[?:1.8.0_102]
> ... 1 more {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25477) The directory structure of the State Backends document is not standardized

2022-01-03 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25477?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-25477:
-
Fix Version/s: 1.14.3

> The directory structure of the State Backends document is not standardized
> --
>
> Key: FLINK-25477
> URL: https://issues.apache.org/jira/browse/FLINK-25477
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Runtime / State Backends
>Reporter: Hangxiang Yu
>Assignee: Hangxiang Yu
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.3
>
> Attachments: image-2021-12-29-16-56-24-657.png
>
>
> The State Backends document uses multiple first-level headings. 
> It may cause the directory structure displayed incorrectly.
> Just as the picture shows, the two titles are not in the table of contents on 
> the right.
>  
> !image-2021-12-29-16-56-24-657.png|width=522,height=230!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-25477) The directory structure of the State Backends document is not standardized

2022-01-03 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25477?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-25477.


Merged in master: 332eee0a0d6edb9c9aa455d182f79977496af756

 

release-1.14: 7263d977150c7d7fb880a637e2d65996cca7ea3a

 

> The directory structure of the State Backends document is not standardized
> --
>
> Key: FLINK-25477
> URL: https://issues.apache.org/jira/browse/FLINK-25477
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Runtime / State Backends
>Reporter: Hangxiang Yu
>Assignee: Hangxiang Yu
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.15.0
>
> Attachments: image-2021-12-29-16-56-24-657.png
>
>
> The State Backends document uses multiple first-level headings. 
> It may cause the directory structure displayed incorrectly.
> Just as the picture shows, the two titles are not in the table of contents on 
> the right.
>  
> !image-2021-12-29-16-56-24-657.png|width=522,height=230!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25429) Avoid to close output streams twice during uploading changelogs

2021-12-31 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25429?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-25429:
-
Issue Type: Bug  (was: Improvement)

> Avoid to close output streams twice during uploading changelogs
> ---
>
> Key: FLINK-25429
> URL: https://issues.apache.org/jira/browse/FLINK-25429
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Current uploader implementation would close {{stream}} and {{fsStream}} one 
> by one, which lead to {{fsStream}} closed twice.
> {code:java}
> try (FSDataOutputStream fsStream = fileSystem.create(path, 
> NO_OVERWRITE)) {
> fsStream.write(compression ? 1 : 0);
> try (OutputStreamWithPos stream = wrap(fsStream); ) {
> final Map> tasksOffsets 
> = new HashMap<>();
> for (UploadTask task : tasks) {
> tasksOffsets.put(task, format.write(stream, 
> task.changeSets));
> }
> FileStateHandle handle = new FileStateHandle(path, 
> stream.getPos());
> // WARN: streams have to be closed before returning the 
> results
> // otherwise JM may receive invalid handles
> return new LocalResult(tasksOffsets, handle);
> }
> }
> {code}
> Not all file system supports to close same stream twice.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25494) Duplicate element serializer during DefaultOperatorStateBackendSnapshotStrategy#syncPrepareResources

2021-12-30 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-25494:
-
Description: 
Currently, during 
DefaultOperatorStateBackendSnapshotStrategy#syncPrepareResources, it will copy 
the array list serializer via PartitionableListState#deepCopy. However, it just 
initialize another ArrayListSerializer and not duplicate the internal state 
serializer:

 

See "{{{}internalListCopySerializer{}}}":
 
{code:java}
private PartitionableListState(
RegisteredOperatorStateBackendMetaInfo stateMetaInfo, ArrayList 
internalList) {

this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo);
this.internalList = Preconditions.checkNotNull(internalList);
this.internalListCopySerializer =
new 
ArrayListSerializer<>(stateMetaInfo.getPartitionStateSerializer());
} {code}
 

This would cause unexpected problem with the usage of kryo serializer.

  was:
Currently, during 
DefaultOperatorStateBackendSnapshotStrategy#syncPrepareResources, it will copy 
the array list serializer via PartitionableListState#deepCopy. However, it just 
initialize another ArrayListSerializer and not duplicate the internal state 
serializer:

 

See "{{{}internalListCopySerializer{}}}":
 
{code:java}
private PartitionableListState(
RegisteredOperatorStateBackendMetaInfo stateMetaInfo, ArrayList 
internalList) {

this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo);
this.internalList = Preconditions.checkNotNull(internalList);
this.internalListCopySerializer =
new 
ArrayListSerializer<>(stateMetaInfo.getPartitionStateSerializer());
} {code}
 

This would cause unexpected problem with usage of kryo serializer.


> Duplicate element serializer during 
> DefaultOperatorStateBackendSnapshotStrategy#syncPrepareResources
> 
>
> Key: FLINK-25494
> URL: https://issues.apache.org/jira/browse/FLINK-25494
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
> Fix For: 1.15.0, 1.13.6, 1.14.3
>
>
> Currently, during 
> DefaultOperatorStateBackendSnapshotStrategy#syncPrepareResources, it will 
> copy the array list serializer via PartitionableListState#deepCopy. However, 
> it just initialize another ArrayListSerializer and not duplicate the internal 
> state serializer:
>  
> See "{{{}internalListCopySerializer{}}}":
>  
> {code:java}
> private PartitionableListState(
> RegisteredOperatorStateBackendMetaInfo stateMetaInfo, ArrayList 
> internalList) {
> this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo);
> this.internalList = Preconditions.checkNotNull(internalList);
> this.internalListCopySerializer =
> new 
> ArrayListSerializer<>(stateMetaInfo.getPartitionStateSerializer());
> } {code}
>  
> This would cause unexpected problem with the usage of kryo serializer.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25494) Duplicate element serializer during DefaultOperatorStateBackendSnapshotStrategy#syncPrepareResources

2021-12-30 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-25494:
-
Description: 
Currently, during 
DefaultOperatorStateBackendSnapshotStrategy#syncPrepareResources, it will copy 
the array list serializer via PartitionableListState#deepCopy. However, it just 
initialize another ArrayListSerializer and not duplicate the internal state 
serializer:

 

See "{{{}internalListCopySerializer{}}}":
 
{code:java}
private PartitionableListState(
RegisteredOperatorStateBackendMetaInfo stateMetaInfo, ArrayList 
internalList) {

this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo);
this.internalList = Preconditions.checkNotNull(internalList);
this.internalListCopySerializer =
new 
ArrayListSerializer<>(stateMetaInfo.getPartitionStateSerializer());
} {code}
 

This would cause unexpected problem with usage of kryo serializer.

  was:
Currently, during 
DefaultOperatorStateBackendSnapshotStrategy#syncPrepareResources, it will copy 
the array list serializer via PartitionableListState#deepCopy. However, it just 
initialize another ArrayListSerializer and not duplicate the internal state 
serializer:

 

See "{{internalListCopySerializer}}":
 
{code:java}
private PartitionableListState(
RegisteredOperatorStateBackendMetaInfo stateMetaInfo, ArrayList 
internalList) {

this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo);
this.internalList = Preconditions.checkNotNull(internalList);
this.internalListCopySerializer =
new 
ArrayListSerializer<>(stateMetaInfo.getPartitionStateSerializer());
} {code}
 


> Duplicate element serializer during 
> DefaultOperatorStateBackendSnapshotStrategy#syncPrepareResources
> 
>
> Key: FLINK-25494
> URL: https://issues.apache.org/jira/browse/FLINK-25494
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
> Fix For: 1.15.0, 1.13.6, 1.14.3
>
>
> Currently, during 
> DefaultOperatorStateBackendSnapshotStrategy#syncPrepareResources, it will 
> copy the array list serializer via PartitionableListState#deepCopy. However, 
> it just initialize another ArrayListSerializer and not duplicate the internal 
> state serializer:
>  
> See "{{{}internalListCopySerializer{}}}":
>  
> {code:java}
> private PartitionableListState(
> RegisteredOperatorStateBackendMetaInfo stateMetaInfo, ArrayList 
> internalList) {
> this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo);
> this.internalList = Preconditions.checkNotNull(internalList);
> this.internalListCopySerializer =
> new 
> ArrayListSerializer<>(stateMetaInfo.getPartitionStateSerializer());
> } {code}
>  
> This would cause unexpected problem with usage of kryo serializer.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25494) Duplicate element serializer during DefaultOperatorStateBackendSnapshotStrategy#syncPrepareResources

2021-12-30 Thread Yun Tang (Jira)
Yun Tang created FLINK-25494:


 Summary: Duplicate element serializer during 
DefaultOperatorStateBackendSnapshotStrategy#syncPrepareResources
 Key: FLINK-25494
 URL: https://issues.apache.org/jira/browse/FLINK-25494
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Reporter: Yun Tang
Assignee: Yun Tang
 Fix For: 1.15.0, 1.13.6, 1.14.3


Currently, during 
DefaultOperatorStateBackendSnapshotStrategy#syncPrepareResources, it will copy 
the array list serializer via PartitionableListState#deepCopy. However, it just 
initialize another ArrayListSerializer and not duplicate the internal state 
serializer:

 

See "{{internalListCopySerializer}}":
 
{code:java}
private PartitionableListState(
RegisteredOperatorStateBackendMetaInfo stateMetaInfo, ArrayList 
internalList) {

this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo);
this.internalList = Preconditions.checkNotNull(internalList);
this.internalListCopySerializer =
new 
ArrayListSerializer<>(stateMetaInfo.getPartitionStateSerializer());
} {code}
 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-10954) Hardlink from files of previous local stored state might cross devices

2021-12-30 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-10954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-10954.

Resolution: Information Provided

After FLINK-25468, we can at least not fail the job.

> Hardlink from files of previous local stored state might cross devices
> --
>
> Key: FLINK-10954
> URL: https://issues.apache.org/jira/browse/FLINK-10954
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.6.2
>Reporter: Yun Tang
>Priority: Minor
>  Labels: auto-deprioritized-critical, auto-deprioritized-major, 
> auto-unassigned
>
> Currently, local recovery's base directories is initialized from 
> '{{io.tmp.dirs}}' if parameter '{{taskmanager.state.local.root-dirs}}' is not 
> set. For Yarn environment, the tmp dirs is replaced by its '{{LOCAL_DIRS}}', 
> which might consist of directories from different devices, such as 
> /dump/1/nm-local-dir, /dump/2/nm-local-dir. The local directory for RocksDB 
> is initialized from IOManager's spillingDirectories, which might located in 
> different device from local recovery's folder. However, hard-link between 
> different devices is not allowed, it will throw exception below:
> {code:java}
> java.nio.file.FileSystemException: target -> souce: Invalid cross-device link
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25478) Same materialized state handle should not register multi times

2021-12-30 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-25478:
-
Summary: Same materialized state handle should not register multi times  
(was: Same materialized state handle should register multi times)

> Same materialized state handle should not register multi times
> --
>
> Key: FLINK-25478
> URL: https://issues.apache.org/jira/browse/FLINK-25478
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: Yun Tang
>Priority: Critical
> Fix For: 1.15.0
>
>
> Currently, changelog materialization would call RocksDB state backend's 
> snapshot method to generate {{IncrementalRemoteKeyedStateHandle}} as 
> ChangelogStateBackendHandleImpl's materialized artifacts. And before next 
> materialization, it will always report the same 
> {{IncrementalRemoteKeyedStateHandle}} as before.
> It's fine to register this for the 1st time. However, for the 2nd time to 
> register {{IncrementalRemoteKeyedStateHandle}} (via 
> {{{}ChangelogStateBackendHandleImpl#registerSharedStates{}}}), it will 
> discard the private state artifacts without check the register reference:
> IncrementalRemoteKeyedStateHandle:
> {code:java}
> public void discardState() throws Exception {
> try {
> StateUtil.bestEffortDiscardAllStateObjects(privateState.values());
> } catch (Exception e) {
> LOG.warn("Could not properly discard misc file states.", e);
> }
> }
> {code}
> Thus, this would delete the private state (such as RocksDB's MAINFEST), and 
> once restore, job would not report FileNotFoundException.
>  
> {code:java}
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught 
> unexpected exception.
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:403)
> at 
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:477)
>  
> at 
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:90)
>  
>     at 
> org.apache.flink.state.changelog.ChangelogStateBackend.lambda$createKeyedStateBackend$1(ChangelogStateBackend.java:153)
> at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:68)
>     at 
> org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:221)
>  
> at 
> org.apache.flink.state.changelog.ChangelogStateBackend.createKeyedStateBackend(ChangelogStateBackend.java:145)
>  
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
>  
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>  
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>  
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
>  
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
>  
> ... 10 more
> Caused by: java.io.FileNotFoundException: x
> at 
> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(PluginFileSystemFactory.java:127)
>  
> at 
> org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
>  
> at 
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:127)
>  
> at 
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:110)
>     at 
> org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49)
>  
> at 
> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
>  ~[?:1.8.0_102]
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)
>  ~[?:1.8.0_102]
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)
>  ~[?:1.8.0_102]
> ... 1 more {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25478) Same materialized state handle should register multi times

2021-12-30 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-25478:
-
Parent: FLINK-21352
Issue Type: Sub-task  (was: Bug)

> Same materialized state handle should register multi times
> --
>
> Key: FLINK-25478
> URL: https://issues.apache.org/jira/browse/FLINK-25478
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: Yun Tang
>Priority: Critical
> Fix For: 1.15.0
>
>
> Currently, changelog materialization would call RocksDB state backend's 
> snapshot method to generate {{IncrementalRemoteKeyedStateHandle}} as 
> ChangelogStateBackendHandleImpl's materialized artifacts. And before next 
> materialization, it will always report the same 
> {{IncrementalRemoteKeyedStateHandle}} as before.
> It's fine to register this for the 1st time. However, for the 2nd time to 
> register {{IncrementalRemoteKeyedStateHandle}} (via 
> {{{}ChangelogStateBackendHandleImpl#registerSharedStates{}}}), it will 
> discard the private state artifacts without check the register reference:
> IncrementalRemoteKeyedStateHandle:
> {code:java}
> public void discardState() throws Exception {
> try {
> StateUtil.bestEffortDiscardAllStateObjects(privateState.values());
> } catch (Exception e) {
> LOG.warn("Could not properly discard misc file states.", e);
> }
> }
> {code}
> Thus, this would delete the private state (such as RocksDB's MAINFEST), and 
> once restore, job would not report FileNotFoundException.
>  
> {code:java}
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught 
> unexpected exception.
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:403)
> at 
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:477)
>  
> at 
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:90)
>  
>     at 
> org.apache.flink.state.changelog.ChangelogStateBackend.lambda$createKeyedStateBackend$1(ChangelogStateBackend.java:153)
> at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:68)
>     at 
> org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:221)
>  
> at 
> org.apache.flink.state.changelog.ChangelogStateBackend.createKeyedStateBackend(ChangelogStateBackend.java:145)
>  
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
>  
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>  
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>  
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
>  
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
>  
> ... 10 more
> Caused by: java.io.FileNotFoundException: x
> at 
> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(PluginFileSystemFactory.java:127)
>  
> at 
> org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
>  
> at 
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:127)
>  
> at 
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:110)
>     at 
> org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49)
>  
> at 
> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
>  ~[?:1.8.0_102]
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)
>  ~[?:1.8.0_102]
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)
>  ~[?:1.8.0_102]
> ... 1 more {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25478) Same materialized state handle should register multi times

2021-12-30 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-25478:
-
Description: 
Currently, changelog materialization would call RocksDB state backend's 
snapshot method to generate {{IncrementalRemoteKeyedStateHandle}} as 
ChangelogStateBackendHandleImpl's materialized artifacts. And before next 
materialization, it will always report the same 
{{IncrementalRemoteKeyedStateHandle}} as before.

It's fine to register this for the 1st time. However, for the 2nd time to 
register {{IncrementalRemoteKeyedStateHandle}} (via 
{{{}ChangelogStateBackendHandleImpl#registerSharedStates{}}}), it will discard 
the private state artifacts without check the register reference:

IncrementalRemoteKeyedStateHandle:
{code:java}
public void discardState() throws Exception {

try {
StateUtil.bestEffortDiscardAllStateObjects(privateState.values());
} catch (Exception e) {
LOG.warn("Could not properly discard misc file states.", e);
}
}
{code}
Thus, this would delete the private state (such as RocksDB's MAINFEST), and 
once restore, job would not report FileNotFoundException.

 
{code:java}
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught 
unexpected exception.
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:403)
at 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:477)
 
at 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:90)
 
    at 
org.apache.flink.state.changelog.ChangelogStateBackend.lambda$createKeyedStateBackend$1(ChangelogStateBackend.java:153)
at 
org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:68)
    at 
org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:221)
 
at 
org.apache.flink.state.changelog.ChangelogStateBackend.createKeyedStateBackend(ChangelogStateBackend.java:145)
 
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
 
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
 
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
 
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
 
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
 
... 10 more
Caused by: java.io.FileNotFoundException: x
at 
org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(PluginFileSystemFactory.java:127)
 
at 
org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
 
at 
org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:127)
 
at 
org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:110)
    at 
org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49)
 
at 
java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
 ~[?:1.8.0_102]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147) 
~[?:1.8.0_102]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622) 
~[?:1.8.0_102]
... 1 more {code}

  was:
Currently, changelog materialization would call RocksDB state backend's 
snapshot method to generate {{IncrementalRemoteKeyedStateHandle}} as 
ChangelogStateBackendHandleImpl's materialized artifacts. And before next 
materialization, it will always report the same 
{{IncrementalRemoteKeyedStateHandle}} as before.

It's fine to register this for the 1st time. However, for the 2nd time to 
register {{IncrementalRemoteKeyedStateHandle}} (via 
{{{}ChangelogStateBackendHandleImpl#registerSharedStates{}}}), it will discard 
the private state artifacts without check the register reference:

IncrementalRemoteKeyedStateHandle:
{code:java}
public void discardState() throws Exception {

try {
StateUtil.bestEffortDiscardAllStateObjects(privateState.values());
} catch (Exception e) {
LOG.warn("Could not properly discard misc file states.", e);
}
}
{code}
Thus, this would delete the private state (such as RocksDB's MAINFEST), and 
once restore, job would not report Fi

[jira] [Updated] (FLINK-25478) Same materialized state handle should register multi times

2021-12-30 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-25478:
-
Summary: Same materialized state handle should register multi times  (was: 
Changelog materialization with incremental checkpoint could cause checkpointed 
data lost)

> Same materialized state handle should register multi times
> --
>
> Key: FLINK-25478
> URL: https://issues.apache.org/jira/browse/FLINK-25478
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: Yun Tang
>Priority: Critical
> Fix For: 1.15.0
>
>
> Currently, changelog materialization would call RocksDB state backend's 
> snapshot method to generate {{IncrementalRemoteKeyedStateHandle}} as 
> ChangelogStateBackendHandleImpl's materialized artifacts. And before next 
> materialization, it will always report the same 
> {{IncrementalRemoteKeyedStateHandle}} as before.
> It's fine to register this for the 1st time. However, for the 2nd time to 
> register {{IncrementalRemoteKeyedStateHandle}} (via 
> {{{}ChangelogStateBackendHandleImpl#registerSharedStates{}}}), it will 
> discard the private state artifacts without check the register reference:
> IncrementalRemoteKeyedStateHandle:
> {code:java}
> public void discardState() throws Exception {
> try {
> StateUtil.bestEffortDiscardAllStateObjects(privateState.values());
> } catch (Exception e) {
> LOG.warn("Could not properly discard misc file states.", e);
> }
> }
> {code}
> Thus, this would delete the private state (such as RocksDB's MAINFEST), and 
> once restore, job would not report FileNotFoundException.
>  
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught 
> unexpected exception.
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:403)
> at 
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:477)
>  
> at 
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:90)
>  
>     at 
> org.apache.flink.state.changelog.ChangelogStateBackend.lambda$createKeyedStateBackend$1(ChangelogStateBackend.java:153)
> at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:68)
>     at 
> org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:221)
>  
> at 
> org.apache.flink.state.changelog.ChangelogStateBackend.createKeyedStateBackend(ChangelogStateBackend.java:145)
>  
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
>  
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>  
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>  
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
>  
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
>  
> ... 10 more
> Caused by: java.io.FileNotFoundException: x
> at 
> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(PluginFileSystemFactory.java:127)
>  
> at 
> org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
>  
> at 
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:127)
>  
> at 
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:110)
>     at 
> org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49)
>  
> at 
> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
>  ~[?:1.8.0_102]
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)
>  ~[?:1.8.0_102]
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)
>  ~[?:1.8.0_102]
> ... 1 more



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25478) Changelog materialization with incremental checkpoint could cause checkpointed data lost

2021-12-30 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-25478:
-
Description: 
Currently, changelog materialization would call RocksDB state backend's 
snapshot method to generate {{IncrementalRemoteKeyedStateHandle}} as 
ChangelogStateBackendHandleImpl's materialized artifacts. And before next 
materialization, it will always report the same 
{{IncrementalRemoteKeyedStateHandle}} as before.

It's fine to register this for the 1st time. However, for the 2nd time to 
register {{IncrementalRemoteKeyedStateHandle}} (via 
{{{}ChangelogStateBackendHandleImpl#registerSharedStates{}}}), it will discard 
the private state artifacts without check the register reference:

IncrementalRemoteKeyedStateHandle:
{code:java}
public void discardState() throws Exception {

try {
StateUtil.bestEffortDiscardAllStateObjects(privateState.values());
} catch (Exception e) {
LOG.warn("Could not properly discard misc file states.", e);
}
}
{code}
Thus, this would delete the private state (such as RocksDB's MAINFEST), and 
once restore, job would not report FileNotFoundException.

 

Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught 
unexpected exception.

at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:403)

at 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:477)
 

at 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:90)
 

    at 
org.apache.flink.state.changelog.ChangelogStateBackend.lambda$createKeyedStateBackend$1(ChangelogStateBackend.java:153)

at 
org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:68)

    at 
org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:221)
 

at 
org.apache.flink.state.changelog.ChangelogStateBackend.createKeyedStateBackend(ChangelogStateBackend.java:145)
 

at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
 

at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
 

at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
 

at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
 

at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
 

... 10 more

Caused by: java.io.FileNotFoundException: x

at 
org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(PluginFileSystemFactory.java:127)
 

at 
org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
 

at 
org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:127)
 

at 
org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:110)

    at 
org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49)
 

at 
java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
 ~[?:1.8.0_102]

at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147) 
~[?:1.8.0_102]

at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622) 
~[?:1.8.0_102]

... 1 more

  was:
Currently, changelog materialization would call RocksDB state backend's 
snapshot method to generate {{IncrementalRemoteKeyedStateHandle}} as 
ChangelogStateBackendHandleImpl's materialized artifacts. And before next 
materialization, it will always report the same 
{{IncrementalRemoteKeyedStateHandle}} as before.

It's fine to register this for the 1st time. However, for the 2nd time to 
register {{IncrementalRemoteKeyedStateHandle}} (via 
{{ChangelogStateBackendHandleImpl#registerSharedStates}}), it will discard the 
private state artifacts without check the register reference:

IncrementalRemoteKeyedStateHandle:
{code:java}
public void discardState() throws Exception {

try {
StateUtil.bestEffortDiscardAllStateObjects(privateState.values());
} catch (Exception e) {
LOG.warn("Could not properly discard misc file states.", e);
}
}
{code}

Thus, this would delete the private state (such as RocksDB's MAINFEST), and 
once restore, job would not report 

[jira] [Assigned] (FLINK-25477) The directory structure of the State Backends document is not standardized

2021-12-29 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25477?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-25477:


Assignee: Hangxiang Yu

> The directory structure of the State Backends document is not standardized
> --
>
> Key: FLINK-25477
> URL: https://issues.apache.org/jira/browse/FLINK-25477
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Runtime / State Backends
>Reporter: Hangxiang Yu
>Assignee: Hangxiang Yu
>Priority: Minor
> Fix For: 1.15.0
>
> Attachments: image-2021-12-29-16-56-24-657.png
>
>
> The State Backends document uses multiple first-level headings. 
> It may cause the directory structure displayed incorrectly.
> Just as the picture shows, the two titles are not in the table of contents on 
> the right.
>  
> !image-2021-12-29-16-56-24-657.png|width=522,height=230!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25479) Changlog materialization with incremental checkpoint cannot work well in local tests

2021-12-29 Thread Yun Tang (Jira)
Yun Tang created FLINK-25479:


 Summary: Changlog materialization with incremental checkpoint 
cannot work well in local tests
 Key: FLINK-25479
 URL: https://issues.apache.org/jira/browse/FLINK-25479
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing, Runtime / State Backends, Tests
Reporter: Yun Tang
 Fix For: 1.15.0


Currently, changelog materialization would call RocksDB state backend's 
snapshot method to generate {{IncrementalRemoteKeyedStateHandle}} as 
ChangelogStateBackendHandleImpl's materialized artifacts. And before next 
materialization, it will always report the same 
{{IncrementalRemoteKeyedStateHandle}} as before.

For local tests, TM would report the {{IncrementalRemoteKeyedStateHandle}} to 
JM via local {{LocalRpcInvocation}}. However, as {{LocalRpcInvocation}} would 
not de/serialize message, which leads once we register the 
{{IncrementalRemoteKeyedStateHandle}} on JM side, it will also add a 
{{sharedStateRegistry}} to the one located on TM side. For the 2nd checkpoint, 
TM would reported same {{IncrementalRemoteKeyedStateHandle}} with  
{{sharedStateRegistry}} to JM. And it will then throw exception as it already 
contains a {{sharedStateRegistry}}:

IncrementalRemoteKeyedStateHandle
{code:java}
public void registerSharedStates(SharedStateRegistry stateRegistry, long 
checkpointID) {
   Preconditions.checkState(
sharedStateRegistry != stateRegistry,
"The state handle has already registered its shared states to 
the given registry.");

}
{code}

This bug would go in distribution environment as 
{{IncrementalRemoteKeyedStateHandle}} would be serialized and 
{{sharedStateRegistry}} is tagged as {{transient}}.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25478) Changelog materialization with incremental checkpoint could cause checkpointed data lost

2021-12-29 Thread Yun Tang (Jira)
Yun Tang created FLINK-25478:


 Summary: Changelog materialization with incremental checkpoint 
could cause checkpointed data lost
 Key: FLINK-25478
 URL: https://issues.apache.org/jira/browse/FLINK-25478
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing, Runtime / State Backends
Reporter: Yun Tang
 Fix For: 1.15.0


Currently, changelog materialization would call RocksDB state backend's 
snapshot method to generate {{IncrementalRemoteKeyedStateHandle}} as 
ChangelogStateBackendHandleImpl's materialized artifacts. And before next 
materialization, it will always report the same 
{{IncrementalRemoteKeyedStateHandle}} as before.

It's fine to register this for the 1st time. However, for the 2nd time to 
register {{IncrementalRemoteKeyedStateHandle}} (via 
{{ChangelogStateBackendHandleImpl#registerSharedStates}}), it will discard the 
private state artifacts without check the register reference:

IncrementalRemoteKeyedStateHandle:
{code:java}
public void discardState() throws Exception {

try {
StateUtil.bestEffortDiscardAllStateObjects(privateState.values());
} catch (Exception e) {
LOG.warn("Could not properly discard misc file states.", e);
}
}
{code}

Thus, this would delete the private state (such as RocksDB's MAINFEST), and 
once restore, job would not report FileNotFoundException.




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (FLINK-25465) Correct the logic of materizating wrapped changelog state

2021-12-28 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25465?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang resolved FLINK-25465.
--
Resolution: Fixed

merged in master: 57391962d5119816beebec56e78a034130e0bfdd

> Correct the logic of materizating wrapped changelog state
> -
>
> Key: FLINK-25465
> URL: https://issues.apache.org/jira/browse/FLINK-25465
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> {{ChangelogKeyedStateBackend#keyValueStatesByName}} would store wrapped 
> state, such as TTL state or latency tracking state. It will throw exception 
> on initMaterialization if wrapped:
> {code:java}
> for (InternalKvState changelogState : keyValueStatesByName.values()) 
> {
> checkState(changelogState instanceof ChangelogState);
> ((ChangelogState) changelogState).resetWritingMetaFlag();
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (FLINK-25446) Avoid sanity check on read bytes on DataInputStream#read(byte[])

2021-12-28 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang resolved FLINK-25446.
--
Resolution: Fixed

Merged

master: c0f46ef324c35b3ed7813c74931ab9cb589896f7

release-1.14: 24016b83eec53ba7124d651201daa580323b80fc

> Avoid sanity check on read bytes on DataInputStream#read(byte[])
> 
>
> Key: FLINK-25446
> URL: https://issues.apache.org/jira/browse/FLINK-25446
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.14.2
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.3
>
>
> Current changelog related code would check the number of read bytes whether 
> equal to target bytes:
> {code:java}
> checkState(size == input.read(bytes));
> {code}
> However, this is not correct as the java doc said: {{"An attempt is made to 
> read as many as len bytes, but a smaller number may be read, possibly zero."}}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-5279) Improve error message when trying to access keyed state in non-keyed operator

2021-12-28 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-5279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17466060#comment-17466060
 ] 

Yun Tang commented on FLINK-5279:
-

Current error message has been "{{{}Keyed state can only be used on a 'keyed 
stream', i.e., after a 'keyBy()' operation.{}}}", and I think we could still 
improve this message via adding the state descriptor name.

> Improve error message when trying to access keyed state in non-keyed operator
> -
>
> Key: FLINK-5279
> URL: https://issues.apache.org/jira/browse/FLINK-5279
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.1.3
>Reporter: Ufuk Celebi
>Priority: Minor
>  Labels: auto-deprioritized-major, stale-minor
>
> When trying to access keyed state in a non-keyed operator, the error message 
> is not very helpful. You get a trace like this:
> {code}
> java.lang.RuntimeException: Error while getting state
> ...
> Caused by: java.lang.RuntimeException: State key serializer has not been 
> configured in the config. This operation cannot use partitioned state.
> {code}
> It will be helpful to users if this is more explicit to users, stating that 
> the API can only be used on keyed streams, etc.
> If this applies to the current master as well, we should fix it there, too.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-21218) "state.checkpoints.dir" should be required only when "execution.checkpointing.interval" is specified

2021-12-28 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-21218.

Resolution: Information Provided

Since Flink already use new API EmbeddedRocksDBStateBackend to replace 
RocksDBStateBackend in state backend loader, this problem has gone.

> "state.checkpoints.dir" should be required only when 
> "execution.checkpointing.interval" is specified
> 
>
> Key: FLINK-21218
> URL: https://issues.apache.org/jira/browse/FLINK-21218
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Dongwon Kim
>Priority: Minor
>
> Users have to specify "state.checkpoints.dir" even when to use a state 
> backend as an unreliable per-key state storage. 
> Thread in user ML : 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Setting-quot-unreliable-quot-RocksDB-state-backend-w-o-quot-execution-checkpointing-interval-quot-ans-td41003.html



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25426) UnalignedCheckpointRescaleITCase.shouldRescaleUnalignedCheckpoint fails on AZP because it cannot allocate enough network buffers

2021-12-28 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17466044#comment-17466044
 ] 

Yun Tang commented on FLINK-25426:
--

Another instance: 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=28634&view=logs&j=5c8e7682-d68f-54d1-16a2-a09310218a49&t=86f654fa-ab48-5c1a-25f4-7e7f6afb9bba

> UnalignedCheckpointRescaleITCase.shouldRescaleUnalignedCheckpoint fails on 
> AZP because it cannot allocate enough network buffers
> 
>
> Key: FLINK-25426
> URL: https://issues.apache.org/jira/browse/FLINK-25426
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0
>Reporter: Till Rohrmann
>Assignee: Anton Kalashnikov
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.15.0
>
>
> The test 
> {{UnalignedCheckpointRescaleITCase.shouldRescaleUnalignedCheckpoint}} fails 
> with
> {code}
> 2021-12-23T02:54:46.2862342Z Dec 23 02:54:46 [ERROR] 
> UnalignedCheckpointRescaleITCase.shouldRescaleUnalignedCheckpoint  Time 
> elapsed: 2.992 s  <<< ERROR!
> 2021-12-23T02:54:46.2865774Z Dec 23 02:54:46 java.lang.OutOfMemoryError: 
> Could not allocate enough memory segments for NetworkBufferPool (required 
> (Mb): 64, allocated (Mb): 14, missing (Mb): 50). Cause: Direct buffer memory. 
> The direct out-of-memory error has occurred. This can mean two things: either 
> job(s) require(s) a larger size of JVM direct memory or there is a direct 
> memory leak. The direct memory can be allocated by user code or some of its 
> dependencies. In this case 'taskmanager.memory.task.off-heap.size' 
> configuration option should be increased. Flink framework and its 
> dependencies also consume the direct memory, mostly for network 
> communication. The most of network memory is managed by Flink and should not 
> result in out-of-memory error. In certain special cases, in particular for 
> jobs with high parallelism, the framework may require more direct memory 
> which is not managed by Flink. In this case 
> 'taskmanager.memory.framework.off-heap.size' configuration option should be 
> increased. If the error persists then there is probably a direct memory leak 
> in user code or some of its dependencies which has to be investigated and 
> fixed. The task executor has to be shutdown...
> 2021-12-23T02:54:46.2868239Z Dec 23 02:54:46  at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.(NetworkBufferPool.java:138)
> 2021-12-23T02:54:46.2868975Z Dec 23 02:54:46  at 
> org.apache.flink.runtime.io.network.NettyShuffleServiceFactory.createNettyShuffleEnvironment(NettyShuffleServiceFactory.java:140)
> 2021-12-23T02:54:46.2869771Z Dec 23 02:54:46  at 
> org.apache.flink.runtime.io.network.NettyShuffleServiceFactory.createNettyShuffleEnvironment(NettyShuffleServiceFactory.java:94)
> 2021-12-23T02:54:46.2870550Z Dec 23 02:54:46  at 
> org.apache.flink.runtime.io.network.NettyShuffleServiceFactory.createShuffleEnvironment(NettyShuffleServiceFactory.java:79)
> 2021-12-23T02:54:46.2871312Z Dec 23 02:54:46  at 
> org.apache.flink.runtime.io.network.NettyShuffleServiceFactory.createShuffleEnvironment(NettyShuffleServiceFactory.java:58)
> 2021-12-23T02:54:46.2872062Z Dec 23 02:54:46  at 
> org.apache.flink.runtime.taskexecutor.TaskManagerServices.createShuffleEnvironment(TaskManagerServices.java:414)
> 2021-12-23T02:54:46.2872767Z Dec 23 02:54:46  at 
> org.apache.flink.runtime.taskexecutor.TaskManagerServices.fromConfiguration(TaskManagerServices.java:282)
> 2021-12-23T02:54:46.2873436Z Dec 23 02:54:46  at 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.startTaskManager(TaskManagerRunner.java:523)
> 2021-12-23T02:54:46.2877615Z Dec 23 02:54:46  at 
> org.apache.flink.runtime.minicluster.MiniCluster.startTaskManager(MiniCluster.java:645)
> 2021-12-23T02:54:46.2878247Z Dec 23 02:54:46  at 
> org.apache.flink.runtime.minicluster.MiniCluster.startTaskManagers(MiniCluster.java:626)
> 2021-12-23T02:54:46.2878856Z Dec 23 02:54:46  at 
> org.apache.flink.runtime.minicluster.MiniCluster.start(MiniCluster.java:379)
> 2021-12-23T02:54:46.2879487Z Dec 23 02:54:46  at 
> org.apache.flink.runtime.testutils.MiniClusterResource.startMiniCluster(MiniClusterResource.java:209)
> 2021-12-23T02:54:46.2880152Z Dec 23 02:54:46  at 
> org.apache.flink.runtime.testutils.MiniClusterResource.before(MiniClusterResource.java:95)
> 2021-12-23T02:54:46.2880821Z Dec 23 02:54:46  at 
> org.apache.flink.test.util.MiniClusterWithClientResource.before(MiniClusterWithClientResource.java:64)
> 2021-12-23T02:54:46.2881519Z Dec 23 02:54:46  at 
> org.apache.flink.test.checkpointing.UnalignedCheckpointTestBase.execute(UnalignedCheckpo

[jira] [Created] (FLINK-25466) TTL configuration could parse in StateTtlConfig#DISABLED

2021-12-27 Thread Yun Tang (Jira)
Yun Tang created FLINK-25466:


 Summary: TTL configuration could parse in StateTtlConfig#DISABLED
 Key: FLINK-25466
 URL: https://issues.apache.org/jira/browse/FLINK-25466
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Reporter: Yun Tang
Assignee: Yun Tang
 Fix For: 1.15.0, 1.13.6, 1.14.3


Current API \{{StateDescriptor#enableTimeToLive(StateTtlConfig)}} cannot handle 
StateTtlConfig#DISABLED due to its current implementation.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25465) Correct the logic of materizating wrapped changelog state

2021-12-27 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25465?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-25465:
-
Summary: Correct the logic of materizating wrapped changelog state  (was: 
Correct the logic to materizate on wrapped changelog state)

> Correct the logic of materizating wrapped changelog state
> -
>
> Key: FLINK-25465
> URL: https://issues.apache.org/jira/browse/FLINK-25465
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
> Fix For: 1.15.0
>
>
> {{ChangelogKeyedStateBackend#keyValueStatesByName}} would store wrapped 
> state, such as TTL state or latency tracking state. It will throw exception 
> on initMaterialization if wrapped:
> {code:java}
> for (InternalKvState changelogState : keyValueStatesByName.values()) 
> {
> checkState(changelogState instanceof ChangelogState);
> ((ChangelogState) changelogState).resetWritingMetaFlag();
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25465) Correct the logic to materizate on wrapped changelog state

2021-12-27 Thread Yun Tang (Jira)
Yun Tang created FLINK-25465:


 Summary: Correct the logic to materizate on wrapped changelog state
 Key: FLINK-25465
 URL: https://issues.apache.org/jira/browse/FLINK-25465
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Reporter: Yun Tang
Assignee: Yun Tang
 Fix For: 1.15.0


{{ChangelogKeyedStateBackend#keyValueStatesByName}} would store wrapped state, 
such as TTL state or latency tracking state. It will throw exception on 
initMaterialization if wrapped:


{code:java}
for (InternalKvState changelogState : keyValueStatesByName.values()) {
checkState(changelogState instanceof ChangelogState);
((ChangelogState) changelogState).resetWritingMetaFlag();
}
{code}




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25426) UnalignedCheckpointRescaleITCase.shouldRescaleUnalignedCheckpoint fails on AZP because it cannot allocate enough network buffers

2021-12-27 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17465955#comment-17465955
 ] 

Yun Tang commented on FLINK-25426:
--

Another instance: 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=28607&view=logs&j=38d6b56a-d502-56fb-7b73-c09f8fe7becd&t=6e6509fa-8a5d-5a6c-e17e-64f5ecc17842

> UnalignedCheckpointRescaleITCase.shouldRescaleUnalignedCheckpoint fails on 
> AZP because it cannot allocate enough network buffers
> 
>
> Key: FLINK-25426
> URL: https://issues.apache.org/jira/browse/FLINK-25426
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0
>Reporter: Till Rohrmann
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.15.0
>
>
> The test 
> {{UnalignedCheckpointRescaleITCase.shouldRescaleUnalignedCheckpoint}} fails 
> with
> {code}
> 2021-12-23T02:54:46.2862342Z Dec 23 02:54:46 [ERROR] 
> UnalignedCheckpointRescaleITCase.shouldRescaleUnalignedCheckpoint  Time 
> elapsed: 2.992 s  <<< ERROR!
> 2021-12-23T02:54:46.2865774Z Dec 23 02:54:46 java.lang.OutOfMemoryError: 
> Could not allocate enough memory segments for NetworkBufferPool (required 
> (Mb): 64, allocated (Mb): 14, missing (Mb): 50). Cause: Direct buffer memory. 
> The direct out-of-memory error has occurred. This can mean two things: either 
> job(s) require(s) a larger size of JVM direct memory or there is a direct 
> memory leak. The direct memory can be allocated by user code or some of its 
> dependencies. In this case 'taskmanager.memory.task.off-heap.size' 
> configuration option should be increased. Flink framework and its 
> dependencies also consume the direct memory, mostly for network 
> communication. The most of network memory is managed by Flink and should not 
> result in out-of-memory error. In certain special cases, in particular for 
> jobs with high parallelism, the framework may require more direct memory 
> which is not managed by Flink. In this case 
> 'taskmanager.memory.framework.off-heap.size' configuration option should be 
> increased. If the error persists then there is probably a direct memory leak 
> in user code or some of its dependencies which has to be investigated and 
> fixed. The task executor has to be shutdown...
> 2021-12-23T02:54:46.2868239Z Dec 23 02:54:46  at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.(NetworkBufferPool.java:138)
> 2021-12-23T02:54:46.2868975Z Dec 23 02:54:46  at 
> org.apache.flink.runtime.io.network.NettyShuffleServiceFactory.createNettyShuffleEnvironment(NettyShuffleServiceFactory.java:140)
> 2021-12-23T02:54:46.2869771Z Dec 23 02:54:46  at 
> org.apache.flink.runtime.io.network.NettyShuffleServiceFactory.createNettyShuffleEnvironment(NettyShuffleServiceFactory.java:94)
> 2021-12-23T02:54:46.2870550Z Dec 23 02:54:46  at 
> org.apache.flink.runtime.io.network.NettyShuffleServiceFactory.createShuffleEnvironment(NettyShuffleServiceFactory.java:79)
> 2021-12-23T02:54:46.2871312Z Dec 23 02:54:46  at 
> org.apache.flink.runtime.io.network.NettyShuffleServiceFactory.createShuffleEnvironment(NettyShuffleServiceFactory.java:58)
> 2021-12-23T02:54:46.2872062Z Dec 23 02:54:46  at 
> org.apache.flink.runtime.taskexecutor.TaskManagerServices.createShuffleEnvironment(TaskManagerServices.java:414)
> 2021-12-23T02:54:46.2872767Z Dec 23 02:54:46  at 
> org.apache.flink.runtime.taskexecutor.TaskManagerServices.fromConfiguration(TaskManagerServices.java:282)
> 2021-12-23T02:54:46.2873436Z Dec 23 02:54:46  at 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.startTaskManager(TaskManagerRunner.java:523)
> 2021-12-23T02:54:46.2877615Z Dec 23 02:54:46  at 
> org.apache.flink.runtime.minicluster.MiniCluster.startTaskManager(MiniCluster.java:645)
> 2021-12-23T02:54:46.2878247Z Dec 23 02:54:46  at 
> org.apache.flink.runtime.minicluster.MiniCluster.startTaskManagers(MiniCluster.java:626)
> 2021-12-23T02:54:46.2878856Z Dec 23 02:54:46  at 
> org.apache.flink.runtime.minicluster.MiniCluster.start(MiniCluster.java:379)
> 2021-12-23T02:54:46.2879487Z Dec 23 02:54:46  at 
> org.apache.flink.runtime.testutils.MiniClusterResource.startMiniCluster(MiniClusterResource.java:209)
> 2021-12-23T02:54:46.2880152Z Dec 23 02:54:46  at 
> org.apache.flink.runtime.testutils.MiniClusterResource.before(MiniClusterResource.java:95)
> 2021-12-23T02:54:46.2880821Z Dec 23 02:54:46  at 
> org.apache.flink.test.util.MiniClusterWithClientResource.before(MiniClusterWithClientResource.java:64)
> 2021-12-23T02:54:46.2881519Z Dec 23 02:54:46  at 
> org.apache.flink.test.checkpointing.UnalignedCheckpointTestBase.execute(UnalignedCheckpointTestBase.java:151)
> 2021-12-23T02:54:

[jira] [Resolved] (FLINK-24785) Relocate RocksDB's log under flink log directory by default

2021-12-27 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang resolved FLINK-24785.
--
Resolution: Fixed

merged in master: cdef0973b21196ca23a31486f5a996f0b397136d

> Relocate RocksDB's log under flink log directory by default
> ---
>
> Key: FLINK-24785
> URL: https://issues.apache.org/jira/browse/FLINK-24785
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Nicholas Jiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Previously, RocksDB's log locates at its own DB folder, which makes the 
> debuging RocksDB not so easy. We could let RocksDB's log stay in Flink's log 
> directory by default.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25260) Recovery fails when using changelog+s3+presto

2021-12-27 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17465646#comment-17465646
 ] 

Yun Tang commented on FLINK-25260:
--

I think this issue would be resolved by 
https://issues.apache.org/jira/browse/FLINK-25446

> Recovery fails when using changelog+s3+presto
> -
>
> Key: FLINK-25260
> URL: https://issues.apache.org/jira/browse/FLINK-25260
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem, Runtime / State Backends
>Affects Versions: 1.15.0
>Reporter: Roman Khachatryan
>Priority: Major
>
> Recovery succeeds if using local FS or hadoop S3 plugin, but fails with 
> Presto:
> {code}
> Caused by: java.lang.IllegalStateException
> at org.apache.flink.util.Preconditions.checkState(Preconditions.java:177)
> at 
> org.apache.flink.changelog.fs.StateChangeFormat$1.readChange(StateChangeFormat.java:138)
> at 
> org.apache.flink.changelog.fs.StateChangeFormat$1.next(StateChangeFormat.java:129)
> at 
> org.apache.flink.changelog.fs.StateChangeFormat$1.next(StateChangeFormat.java:98)
> at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.next(StateChangelogHandleStreamHandleReader.java:76)
> at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.next(StateChangelogHandleStreamHandleReader.java:61)
> at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.readBackendHandle(ChangelogBackendRestoreOperation.java:94)
> at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:74)
> at 
> org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:221)
> at 
> org.apache.flink.state.changelog.ChangelogStateBackend.createKeyedStateBackend(ChangelogStateBackend.java:145)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329)
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> ... 13 more
> {code}
> This is likely caused by some intermediate buffers not being flushed.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25458) Supports local recovery

2021-12-27 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25458?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-25458:
-
Component/s: Runtime / Checkpointing
 Runtime / State Backends

> Supports local recovery
> ---
>
> Key: FLINK-25458
> URL: https://issues.apache.org/jira/browse/FLINK-25458
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: Yun Tang
>Priority: Major
>
> Current changelog state-backend implementation cannot work well with local 
> recovery enabled.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25458) Supports local recovery

2021-12-27 Thread Yun Tang (Jira)
Yun Tang created FLINK-25458:


 Summary: Supports local recovery
 Key: FLINK-25458
 URL: https://issues.apache.org/jira/browse/FLINK-25458
 Project: Flink
  Issue Type: Sub-task
Reporter: Yun Tang


Current changelog state-backend implementation cannot work well with local 
recovery enabled.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25446) Avoid sanity check on read bytes on DataInputStream#read(byte[])

2021-12-26 Thread Yun Tang (Jira)
Yun Tang created FLINK-25446:


 Summary: Avoid sanity check on read bytes on 
DataInputStream#read(byte[])
 Key: FLINK-25446
 URL: https://issues.apache.org/jira/browse/FLINK-25446
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing, Runtime / State Backends
Affects Versions: 1.14.2
Reporter: Yun Tang
Assignee: Yun Tang
 Fix For: 1.15.0, 1.14.3


Current changelog related code would check the number of read bytes whether 
equal to target bytes:

{code:java}
checkState(size == input.read(bytes));
{code}

However, this is not correct as the java doc said: {{"An attempt is made to 
read as many as len bytes, but a smaller number may be read, possibly zero."}}




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (FLINK-25429) Avoid to close output streams twice during uploading changelogs

2021-12-23 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25429?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang resolved FLINK-25429.
--
Resolution: Fixed

> Avoid to close output streams twice during uploading changelogs
> ---
>
> Key: FLINK-25429
> URL: https://issues.apache.org/jira/browse/FLINK-25429
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Current uploader implementation would close {{stream}} and {{fsStream}} one 
> by one, which lead to {{fsStream}} closed twice.
> {code:java}
> try (FSDataOutputStream fsStream = fileSystem.create(path, 
> NO_OVERWRITE)) {
> fsStream.write(compression ? 1 : 0);
> try (OutputStreamWithPos stream = wrap(fsStream); ) {
> final Map> tasksOffsets 
> = new HashMap<>();
> for (UploadTask task : tasks) {
> tasksOffsets.put(task, format.write(stream, 
> task.changeSets));
> }
> FileStateHandle handle = new FileStateHandle(path, 
> stream.getPos());
> // WARN: streams have to be closed before returning the 
> results
> // otherwise JM may receive invalid handles
> return new LocalResult(tasksOffsets, handle);
> }
> }
> {code}
> Not all file system supports to close same stream twice.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (FLINK-25094) The verify code in LatencyTrackingMapStateTest#verifyIterator is not actually executed

2021-12-23 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang resolved FLINK-25094.
--
Resolution: Fixed

> The verify code in LatencyTrackingMapStateTest#verifyIterator is not actually 
> executed
> --
>
> Key: FLINK-25094
> URL: https://issues.apache.org/jira/browse/FLINK-25094
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends, Tests
>Reporter: Jinzhong Li
>Assignee: Jinzhong Li
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> In LatencyTrackingMapStateTest, 
> iterator()/entries().iterator()/keys().iterator()/values().iterator() will be 
> invoke before verifyIterator method is invoked, this is, 
> iterator()/... will be invode before putting the test data into 
> latencyTrackingMapState. So the verify code is not actually executed since 
> "iterator.hasNext()" is always false.
> {code:java}
> private  void verifyIterator(
> LatencyTrackingMapState 
> latencyTrackingState,
> LatencyTrackingMapState.MapStateLatencyMetrics 
> latencyTrackingStateMetric,
> Iterator iterator,
> boolean removeIterator)
> throws Exception {
> ThreadLocalRandom random = ThreadLocalRandom.current();
> for (int index = 1; index <= SAMPLE_INTERVAL; index++) {
> latencyTrackingState.put((long) index, random.nextDouble());
> }
> int count = 1;
> while (iterator.hasNext()) {
> int expectedResult = count == SAMPLE_INTERVAL ? 0 : count;
> assertEquals(expectedResult, 
> latencyTrackingStateMetric.getIteratorHasNextCount());
> iterator.next();
> assertEquals(expectedResult, 
> latencyTrackingStateMetric.getIteratorNextCount());
> if (removeIterator) {
> iterator.remove();
> assertEquals(expectedResult, 
> latencyTrackingStateMetric.getIteratorRemoveCount());
> }
> count += 1;
> }
> // as we call #hasNext on more time than #next, to avoid complex check, 
> just reset hasNext
> // counter in the end.
> latencyTrackingStateMetric.resetIteratorHasNextCount();
> latencyTrackingState.clear();
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25094) The verify code in LatencyTrackingMapStateTest#verifyIterator is not actually executed

2021-12-23 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17464914#comment-17464914
 ] 

Yun Tang commented on FLINK-25094:
--

Merged in master: d08a1d0f4035485283f1063d4d0fc0a8aaca

> The verify code in LatencyTrackingMapStateTest#verifyIterator is not actually 
> executed
> --
>
> Key: FLINK-25094
> URL: https://issues.apache.org/jira/browse/FLINK-25094
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends, Tests
>Reporter: Jinzhong Li
>Assignee: Jinzhong Li
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> In LatencyTrackingMapStateTest, 
> iterator()/entries().iterator()/keys().iterator()/values().iterator() will be 
> invoke before verifyIterator method is invoked, this is, 
> iterator()/... will be invode before putting the test data into 
> latencyTrackingMapState. So the verify code is not actually executed since 
> "iterator.hasNext()" is always false.
> {code:java}
> private  void verifyIterator(
> LatencyTrackingMapState 
> latencyTrackingState,
> LatencyTrackingMapState.MapStateLatencyMetrics 
> latencyTrackingStateMetric,
> Iterator iterator,
> boolean removeIterator)
> throws Exception {
> ThreadLocalRandom random = ThreadLocalRandom.current();
> for (int index = 1; index <= SAMPLE_INTERVAL; index++) {
> latencyTrackingState.put((long) index, random.nextDouble());
> }
> int count = 1;
> while (iterator.hasNext()) {
> int expectedResult = count == SAMPLE_INTERVAL ? 0 : count;
> assertEquals(expectedResult, 
> latencyTrackingStateMetric.getIteratorHasNextCount());
> iterator.next();
> assertEquals(expectedResult, 
> latencyTrackingStateMetric.getIteratorNextCount());
> if (removeIterator) {
> iterator.remove();
> assertEquals(expectedResult, 
> latencyTrackingStateMetric.getIteratorRemoveCount());
> }
> count += 1;
> }
> // as we call #hasNext on more time than #next, to avoid complex check, 
> just reset hasNext
> // counter in the end.
> latencyTrackingStateMetric.resetIteratorHasNextCount();
> latencyTrackingState.clear();
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25429) Avoid to close output streams twice during uploading changelogs

2021-12-23 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17464911#comment-17464911
 ] 

Yun Tang commented on FLINK-25429:
--

merged in master:
804eb8dda556a2bea35c69a2662f13d1dafb9255

> Avoid to close output streams twice during uploading changelogs
> ---
>
> Key: FLINK-25429
> URL: https://issues.apache.org/jira/browse/FLINK-25429
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Current uploader implementation would close {{stream}} and {{fsStream}} one 
> by one, which lead to {{fsStream}} closed twice.
> {code:java}
> try (FSDataOutputStream fsStream = fileSystem.create(path, 
> NO_OVERWRITE)) {
> fsStream.write(compression ? 1 : 0);
> try (OutputStreamWithPos stream = wrap(fsStream); ) {
> final Map> tasksOffsets 
> = new HashMap<>();
> for (UploadTask task : tasks) {
> tasksOffsets.put(task, format.write(stream, 
> task.changeSets));
> }
> FileStateHandle handle = new FileStateHandle(path, 
> stream.getPos());
> // WARN: streams have to be closed before returning the 
> results
> // otherwise JM may receive invalid handles
> return new LocalResult(tasksOffsets, handle);
> }
> }
> {code}
> Not all file system supports to close same stream twice.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25438) KafkaProducerExactlyOnceITCase.testMultipleSinkOperators failed due to topic 'exactlyTopicCustomOperator20' already exists

2021-12-23 Thread Yun Tang (Jira)
Yun Tang created FLINK-25438:


 Summary: KafkaProducerExactlyOnceITCase.testMultipleSinkOperators 
failed due to topic 'exactlyTopicCustomOperator20' already exists
 Key: FLINK-25438
 URL: https://issues.apache.org/jira/browse/FLINK-25438
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka, Tests
Reporter: Yun Tang


Dec 23 13:48:21 [ERROR] Failures: 
Dec 23 13:48:21 [ERROR]   
KafkaProducerExactlyOnceITCase.testMultipleSinkOperators:36->KafkaProducerTestBase.testExactlyOnce:236->KafkaTestBase.createTestTopic:216
 Create test topic : exactlyTopicCustomOperator20 failed, 
org.apache.kafka.common.errors.TopicExistsException: Topic 
'exactlyTopicCustomOperator20' already exists.

instance: 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=28524&view=logs&j=c5f0071e-1851-543e-9a45-9ac140befc32&t=15a22db7-8faa-5b34-3920-d33c9f0ca23c



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25429) Avoid to close output streams twice during uploading changelogs

2021-12-23 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25429?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-25429:
-
Fix Version/s: 1.15.0

> Avoid to close output streams twice during uploading changelogs
> ---
>
> Key: FLINK-25429
> URL: https://issues.apache.org/jira/browse/FLINK-25429
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
> Fix For: 1.15.0
>
>
> Current uploader implementation would close {{stream}} and {{fsStream}} one 
> by one, which lead to {{fsStream}} closed twice.
> {code:java}
> try (FSDataOutputStream fsStream = fileSystem.create(path, 
> NO_OVERWRITE)) {
> fsStream.write(compression ? 1 : 0);
> try (OutputStreamWithPos stream = wrap(fsStream); ) {
> final Map> tasksOffsets 
> = new HashMap<>();
> for (UploadTask task : tasks) {
> tasksOffsets.put(task, format.write(stream, 
> task.changeSets));
> }
> FileStateHandle handle = new FileStateHandle(path, 
> stream.getPos());
> // WARN: streams have to be closed before returning the 
> results
> // otherwise JM may receive invalid handles
> return new LocalResult(tasksOffsets, handle);
> }
> }
> {code}
> Not all file system supports to close same stream twice.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25429) Avoid to close output streams twice during uploading changelogs

2021-12-23 Thread Yun Tang (Jira)
Yun Tang created FLINK-25429:


 Summary: Avoid to close output streams twice during uploading 
changelogs
 Key: FLINK-25429
 URL: https://issues.apache.org/jira/browse/FLINK-25429
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Reporter: Yun Tang
Assignee: Yun Tang


Current uploader implementation would close {{stream}} and {{fsStream}} one by 
one, which lead to {{fsStream}} closed twice.

{code:java}
try (FSDataOutputStream fsStream = fileSystem.create(path, 
NO_OVERWRITE)) {
fsStream.write(compression ? 1 : 0);
try (OutputStreamWithPos stream = wrap(fsStream); ) {
final Map> tasksOffsets = 
new HashMap<>();
for (UploadTask task : tasks) {
tasksOffsets.put(task, format.write(stream, 
task.changeSets));
}
FileStateHandle handle = new FileStateHandle(path, 
stream.getPos());
// WARN: streams have to be closed before returning the results
// otherwise JM may receive invalid handles
return new LocalResult(tasksOffsets, handle);
}
}
{code}

Not all file system supports to close same stream twice.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-17808) Rename checkpoint meta file to "_metadata" until it has completed writing

2021-12-23 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-17808:


Assignee: Junfan Zhang

> Rename checkpoint meta file to "_metadata" until it has completed writing
> -
>
> Key: FLINK-17808
> URL: https://issues.apache.org/jira/browse/FLINK-17808
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.10.0
>Reporter: Yun Tang
>Assignee: Junfan Zhang
>Priority: Minor
>  Labels: auto-deprioritized-major, pull-request-available
> Fix For: 1.15.0
>
>
> In practice, some developers or customers would use some strategy to find the 
> recent _metadata as the checkpoint to recover (e.g as many proposals in 
> FLINK-9043 suggest). However, there existed a "_meatadata" file does not mean 
> the checkpoint have been completed as the writing to create the "_meatadata" 
> file could break as some force quit (e.g. yarn application -kill).
> We could create the checkpoint meta stream to write data to file named as 
> "_metadata.inprogress" and renamed it to "_metadata" once completed writing. 
> By doing so, we could ensure the "_metadata" is not broken.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-17808) Rename checkpoint meta file to "_metadata" until it has completed writing

2021-12-23 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17464459#comment-17464459
 ] 

Yun Tang commented on FLINK-17808:
--

Since [~zuston] had already provided a PR to review, already assigned this 
ticket to him.
Please refactor the PR:
1. Consider file systems which not support recoverable writer. 
2. Resolve all failed tests of current PR.
3. Add a separate test to verify the logic.


> Rename checkpoint meta file to "_metadata" until it has completed writing
> -
>
> Key: FLINK-17808
> URL: https://issues.apache.org/jira/browse/FLINK-17808
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.10.0
>Reporter: Yun Tang
>Assignee: Junfan Zhang
>Priority: Minor
>  Labels: auto-deprioritized-major, pull-request-available
> Fix For: 1.15.0
>
>
> In practice, some developers or customers would use some strategy to find the 
> recent _metadata as the checkpoint to recover (e.g as many proposals in 
> FLINK-9043 suggest). However, there existed a "_meatadata" file does not mean 
> the checkpoint have been completed as the writing to create the "_meatadata" 
> file could break as some force quit (e.g. yarn application -kill).
> We could create the checkpoint meta stream to write data to file named as 
> "_metadata.inprogress" and renamed it to "_metadata" once completed writing. 
> By doing so, we could ensure the "_metadata" is not broken.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25423) Enable loading state backend via configuration in state processor api

2021-12-22 Thread Yun Tang (Jira)
Yun Tang created FLINK-25423:


 Summary: Enable loading state backend via configuration in state 
processor api
 Key: FLINK-25423
 URL: https://issues.apache.org/jira/browse/FLINK-25423
 Project: Flink
  Issue Type: Improvement
  Components: API / State Processor, Runtime / State Backends
Reporter: Yun Tang
 Fix For: 1.15.0


Currently, state processor API would load savepoint via explictly initalizated 
state backend on client side, which is like 
{{StreamExecutionEnvironment#setStateBackend(stateBackend)}}:

{code:java}
Savepoint.load(bEnv, "hdfs://path/", new HashMapStateBackend());
{code}

As we all konw, stream env also support to load state backend via configuration 
to provide flexibility to load state backends especially some customized state 
backend. This could also benefit state processor API with similiar ability.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25360) Add State Desc to CheckpointMetadata

2021-12-22 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17464242#comment-17464242
 ] 

Yun Tang commented on FLINK-25360:
--

[~liufangqi] Since RocksDB incremenatl checkpoints already have enough 
information in checkpoint meta, could you try to use state processor API to 
achive your goal on that?

Your request actually has two problems, and I think they deserve two different 
discussion threads in dev mailling list:
 # Could we write meta state info in savepoint meta. [~dwysakowicz] what do you 
think of this especially you have already done the unified savepoint format 
work.
 # Whether we should persist addition info such as default value, 
reducingFunction to persist the whole state descriptor. I prefer to store them. 
[~tzulitai]  what do you think of this?

BTW, since many guys are on Christmas vacation, maybe we can launch discussions 
after new year's day.

> Add State Desc to CheckpointMetadata
> 
>
> Key: FLINK-25360
> URL: https://issues.apache.org/jira/browse/FLINK-25360
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: 刘方奇
>Priority: Major
> Attachments: image-2021-12-17-20-01-42-423.png
>
>
> Now we can't get the State Descriptor info in the checkpoint meta. Like the 
> case if we use state-processor-api to load state then rewrite state, we can't 
> flexible use the state. 
> Maybe there are other cases we need the State Descriptor, so can we add this 
> info?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


<    2   3   4   5   6   7   8   9   10   11   >