[jira] [Commented] (FLINK-31167) Verify that no exclusions were erroneously added to the japicmp plugin

2023-03-03 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17696081#comment-17696081
 ] 

Biao Liu commented on FLINK-31167:
--

Hi [~mapohl] , sorry for the late response.
{quote} wouldn't we be more stricter by throwing an 
UnsupportedOperationException as the default behavior of the deprecated method 
implementation?
{quote}
I think it's considerable choice. There is only one scenario i'm worrying 
about. If someone implements a new {{{}OutputFormat{}}}, and the IDE does not 
generate the {{open}} method automatically somehow. The codes would pass the 
compiling but fail in runtime. It's not so friendly. So I prefer the less 
stricter choice a bit. What do you think?

> Verify that no exclusions were erroneously added to the japicmp plugin
> --
>
> Key: FLINK-31167
> URL: https://issues.apache.org/jira/browse/FLINK-31167
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Matthias Pohl
>Assignee: Matthias Pohl
>Priority: Major
>
> Verify that no exclusions were erroneously added to the japicmp plugin that 
> break compatibility guarantees. Check the exclusions for the 
> japicmp-maven-plugin in the root pom (see 
> [apache/flink:pom.xml:2175ff|https://github.com/apache/flink/blob/3856c49af77601cf7943a5072d8c932279ce46b4/pom.xml#L2175]
>  for exclusions that:
> * For minor releases: break source compatibility for {{@Public}} APIs
> * For patch releases: break source/binary compatibility for 
> {{@Public}}/{{@PublicEvolving}}  APIs
> Any such exclusion must be properly justified, in advance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31167) Verify that no exclusions were erroneously added to the japicmp plugin

2023-02-21 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17691970#comment-17691970
 ] 

Biao Liu commented on FLINK-31167:
--

Hi [~mapohl] ,

As you mentioned, the new method with context were introduced to replace the 
old one.

Let's take the {{finalizeGlobal}} as an example. If we do not provide the 
default implementation for {{finalizeGlobal(int)}} and someone wants to use the 
new method with context, he has to implement both {{finalizeGlobal(int)}} and 
{{finalizeGlobal(FinalizationContext)}} even the former one means nothing to 
him.

That's not we expect users to do. The {{finalizeGlobal(int)}} works for the 
legacy codes, and {{finalizeGlobal(FinalizationContext)}} works for the new 
implementation. Users only need to implement one of them. It's sort of like how 
SinkFunction is handled in https://issues.apache.org/jira/browse/FLINK-7552.

I think it's not a breaking change however it could not pass the japicmp 
checking.

> Verify that no exclusions were erroneously added to the japicmp plugin
> --
>
> Key: FLINK-31167
> URL: https://issues.apache.org/jira/browse/FLINK-31167
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Matthias Pohl
>Assignee: Matthias Pohl
>Priority: Major
>
> Verify that no exclusions were erroneously added to the japicmp plugin that 
> break compatibility guarantees. Check the exclusions for the 
> japicmp-maven-plugin in the root pom (see 
> [apache/flink:pom.xml:2175ff|https://github.com/apache/flink/blob/3856c49af77601cf7943a5072d8c932279ce46b4/pom.xml#L2175]
>  for exclusions that:
> * For minor releases: break source compatibility for {{@Public}} APIs
> * For patch releases: break source/binary compatibility for 
> {{@Public}}/{{@PublicEvolving}}  APIs
> Any such exclusion must be properly justified, in advance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31124) Add it case for HiveTableSink speculative execution

2023-02-17 Thread Biao Liu (Jira)
Biao Liu created FLINK-31124:


 Summary: Add it case for HiveTableSink speculative execution
 Key: FLINK-31124
 URL: https://issues.apache.org/jira/browse/FLINK-31124
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Reporter: Biao Liu


The part of HiveTableSink has supported speculative execution in 
https://issues.apache.org/jira/browse/FLINK-30823. We would like to add some 
integration test cases for this feature.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31123) Add it case for FileSink speculative execution

2023-02-17 Thread Biao Liu (Jira)
Biao Liu created FLINK-31123:


 Summary: Add it case for FileSink speculative execution
 Key: FLINK-31123
 URL: https://issues.apache.org/jira/browse/FLINK-31123
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Reporter: Biao Liu


The FileSink has supported speculative execution in 
https://issues.apache.org/jira/browse/FLINK-30823. We would like to add some 
integration test cases for this feature.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30823) Enable speculative execution for some of the typical built-in sinks

2023-01-31 Thread Biao Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-30823:
-
Description: 
After the Sink supports speculative execution, here we enable speculative 
execution for some built-in sinks. For each type of Sink (SinkV2, SinkFunction, 
OutputFormat) we picked some typical sinks that do not require any or only 
minor changes.

In this ticket, we would enable DiscardingSink, PrintSinkFunction, PrintSink, 
FileSink, FileSystemOutputFormat to support speculative execution.

Speculative execution might not be meaningful for some of these sinks, like 
DiscardingSink, PrintSinkFunction and PrintSink. However, when these sinks are 
chained with other operators, the chained task could not support speculative 
execution if there is any operator in chain does not support speculative 
execution. That's the main idea behind this.

FileSink and FileSystemOutputFormat are the most typical implementation for 
SinkV2 and OutputFormat. We would not enable speculative execution for the 
StreamingFileSink, because it's marked as deprecated. The HiveTableSink without 
compaction could also support speculative, because it depends on 
FileSystemOutputFormat.

  was:
After the Sink supports speculative execution, here we enable speculative 
execution for some built-in sinks. For each type of Sink (SinkV2, SinkFunction, 
OutputFormat) we picked some typical sinks that do not require any or only 
minor changes.

In this ticket, we would enable DiscardingSink, PrintSinkFunction, PrintSink, 
FileSink, FileSystemOutputFormat to support speculative execution.

Speculative execution might not be meaningful for some of these sinks, like 
DiscardingSink, PrintSinkFunction and PrintSink. However, when these sinks are 
chained with other operators, the chained task could not support speculative 
execution if there is any operator in chain does not support speculative 
execution. That's the main idea behind this.

FileSink and FileSystemOutputFormat are the most typical implementation for 
SinkV2 and OutputFormat. We would not enable speculative execution for the 
StreamingFileSink, because it's marked as deprecated.


> Enable speculative execution for some of the typical built-in sinks
> ---
>
> Key: FLINK-30823
> URL: https://issues.apache.org/jira/browse/FLINK-30823
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> After the Sink supports speculative execution, here we enable speculative 
> execution for some built-in sinks. For each type of Sink (SinkV2, 
> SinkFunction, OutputFormat) we picked some typical sinks that do not require 
> any or only minor changes.
> In this ticket, we would enable DiscardingSink, PrintSinkFunction, PrintSink, 
> FileSink, FileSystemOutputFormat to support speculative execution.
> Speculative execution might not be meaningful for some of these sinks, like 
> DiscardingSink, PrintSinkFunction and PrintSink. However, when these sinks 
> are chained with other operators, the chained task could not support 
> speculative execution if there is any operator in chain does not support 
> speculative execution. That's the main idea behind this.
> FileSink and FileSystemOutputFormat are the most typical implementation for 
> SinkV2 and OutputFormat. We would not enable speculative execution for the 
> StreamingFileSink, because it's marked as deprecated. The HiveTableSink 
> without compaction could also support speculative, because it depends on 
> FileSystemOutputFormat.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30823) Enable speculative execution for some of the typical built-in sinks

2023-01-31 Thread Biao Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-30823:
-
Description: 
After the Sink supports speculative execution, here we enable speculative 
execution for some built-in sinks. For each type of Sink (SinkV2, SinkFunction, 
OutputFormat) we picked some typical sinks that do not require any or only 
minor changes.

In this ticket, we would enable DiscardingSink, PrintSinkFunction, PrintSink, 
FileSink, FileSystemOutputFormat to support speculative execution.

Speculative execution might not be meaningful for some of these sinks, like 
DiscardingSink, PrintSinkFunction and PrintSink. However, when these sinks are 
chained with other operators, the chained task could not support speculative 
execution if there is any operator in chain does not support speculative 
execution. That's the main idea behind this.

FileSink and FileSystemOutputFormat are the most typical implementation for 
SinkV2 and OutputFormat. We would not enable speculative execution for the 
StreamingFileSink, because it's marked as deprecated.

  was:After the Sink supports speculative execution, here we enable speculative 
execution for some built-in sinks. For each type of Sink (SinkV2, SinkFunction, 
OutputFormat) we picked some typical sinks that do not require any or only 
minor changes.


> Enable speculative execution for some of the typical built-in sinks
> ---
>
> Key: FLINK-30823
> URL: https://issues.apache.org/jira/browse/FLINK-30823
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> After the Sink supports speculative execution, here we enable speculative 
> execution for some built-in sinks. For each type of Sink (SinkV2, 
> SinkFunction, OutputFormat) we picked some typical sinks that do not require 
> any or only minor changes.
> In this ticket, we would enable DiscardingSink, PrintSinkFunction, PrintSink, 
> FileSink, FileSystemOutputFormat to support speculative execution.
> Speculative execution might not be meaningful for some of these sinks, like 
> DiscardingSink, PrintSinkFunction and PrintSink. However, when these sinks 
> are chained with other operators, the chained task could not support 
> speculative execution if there is any operator in chain does not support 
> speculative execution. That's the main idea behind this.
> FileSink and FileSystemOutputFormat are the most typical implementation for 
> SinkV2 and OutputFormat. We would not enable speculative execution for the 
> StreamingFileSink, because it's marked as deprecated.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30842) Add document for sink supports speculative execution

2023-01-30 Thread Biao Liu (Jira)
Biao Liu created FLINK-30842:


 Summary: Add document for sink supports speculative execution
 Key: FLINK-30842
 URL: https://issues.apache.org/jira/browse/FLINK-30842
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Biao Liu
 Fix For: 1.17.0


Add document to describe how sink supports speculative execution.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30823) Enable speculative execution for some of the typical built-in sinks

2023-01-30 Thread Biao Liu (Jira)
Biao Liu created FLINK-30823:


 Summary: Enable speculative execution for some of the typical 
built-in sinks
 Key: FLINK-30823
 URL: https://issues.apache.org/jira/browse/FLINK-30823
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Biao Liu
 Fix For: 1.17.0


After the Sink supports speculative execution, here we enable speculative 
execution for some built-in sinks. For each type of Sink (SinkV2, SinkFunction, 
OutputFormat) we picked some typical sinks that do not require any or only 
minor changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30727) JoinReorderITCase.testBushyTreeJoinReorder failed due to IOException

2023-01-29 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17681722#comment-17681722
 ] 

Biao Liu commented on FLINK-30727:
--

Another instance: 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=45311&view=logs&j=0c940707-2659-5648-cbe6-a1ad63045f0a&t=075c2716-8010-5565-fe08-3c4bb45824a4

> JoinReorderITCase.testBushyTreeJoinReorder failed due to IOException
> 
>
> Key: FLINK-30727
> URL: https://issues.apache.org/jira/browse/FLINK-30727
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network, Table SQL / Planner
>Affects Versions: 1.17.0
>Reporter: Matthias Pohl
>Assignee: Yunhong Zheng
>Priority: Blocker
>  Labels: pull-request-available, test-stability
>
> IOException due to timeout occurring while requesting exclusive NetworkBuffer 
> caused JoinReorderITCase.testBushyTreeJoinReorder to fail:
> {code}
> [...]
> Jan 18 01:11:27 Caused by: java.io.IOException: Timeout triggered when 
> requesting exclusive buffers: The total number of network buffers is 
> currently set to 2048 of 32768 bytes each. You can increase this number by 
> setting the configuration keys 'taskmanager.memory.network.fraction', 
> 'taskmanager.memory.network.min', and 'taskmanager.memory.network.max',  or 
> you may increase the timeout which is 3ms by setting the key 
> 'taskmanager.network.memory.exclusive-buffers-request-timeout-ms'.
> Jan 18 01:11:27   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalRequestMemorySegments(NetworkBufferPool.java:256)
> Jan 18 01:11:27   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestPooledMemorySegmentsBlocking(NetworkBufferPool.java:179)
> Jan 18 01:11:27   at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.reserveSegments(LocalBufferPool.java:262)
> Jan 18 01:11:27   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setupChannels(SingleInputGate.java:517)
> Jan 18 01:11:27   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:277)
> Jan 18 01:11:27   at 
> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:105)
> Jan 18 01:11:27   at 
> org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:962)
> Jan 18 01:11:27   at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:648)
> Jan 18 01:11:27   at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:556)
> Jan 18 01:11:27   at java.lang.Thread.run(Thread.java:748)
> {code}
> Same build, 2 failures:
> * 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44987&view=logs&j=0c940707-2659-5648-cbe6-a1ad63045f0a&t=075c2716-8010-5565-fe08-3c4bb45824a4&l=14300
> * 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44987&view=logs&j=ce3801ad-3bd5-5f06-d165-34d37e757d90&t=5e4d9387-1dcc-5885-a901-90469b7e6d2f&l=14362



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30799) Make SinkFunction support speculative execution for batch jobs

2023-01-26 Thread Biao Liu (Jira)
Biao Liu created FLINK-30799:


 Summary: Make SinkFunction support speculative execution for batch 
jobs
 Key: FLINK-30799
 URL: https://issues.apache.org/jira/browse/FLINK-30799
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Biao Liu
 Fix For: 1.17.0


In this ticket, it would make SinkFunction based sink run with speculative 
execution for batch jobs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30798) Make OutputFormat support speculative execution for batch jobs

2023-01-26 Thread Biao Liu (Jira)
Biao Liu created FLINK-30798:


 Summary: Make OutputFormat support speculative execution for batch 
jobs
 Key: FLINK-30798
 URL: https://issues.apache.org/jira/browse/FLINK-30798
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Biao Liu
 Fix For: 1.17.0


This issue would make OutputFormat based sink run with speculative execution 
for batch jobs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30755) Make SinkV2 support speculative execution for batch jobs

2023-01-19 Thread Biao Liu (Jira)
Biao Liu created FLINK-30755:


 Summary: Make SinkV2 support speculative execution for batch jobs
 Key: FLINK-30755
 URL: https://issues.apache.org/jira/browse/FLINK-30755
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Biao Liu
 Fix For: 1.17.0


This is the first part of FLIP-281 implementation. In this ticket, we would 
introduce some base abstraction of supporting speculative sink and then make 
SinkV2 API work with it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30725) FLIP-281: Sink Supports Speculative Execution For Batch Job

2023-01-17 Thread Biao Liu (Jira)
Biao Liu created FLINK-30725:


 Summary: FLIP-281: Sink Supports Speculative Execution For Batch 
Job
 Key: FLINK-30725
 URL: https://issues.apache.org/jira/browse/FLINK-30725
 Project: Flink
  Issue Type: New Feature
  Components: Runtime / Coordination
Reporter: Biao Liu
 Fix For: 1.17.0


This is the umbrella issue of FLIP-281. More details can be found in 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-281+Sink+Supports+Speculative+Execution+For+Batch+Job.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28980) Release Testing: Verify FLIP-168 speculative execution

2022-08-31 Thread Biao Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-28980:
-
Attachment: stdout

> Release Testing: Verify FLIP-168 speculative execution
> --
>
> Key: FLINK-28980
> URL: https://issues.apache.org/jira/browse/FLINK-28980
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhu Zhu
>Assignee: Biao Liu
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.16.0
>
> Attachments: flink-root-standalonesession-0-VM_38_195_centos.log, 
> flink-root-taskexecutor-0-VM_199_24_centos.log, 
> flink-root-taskexecutor-0-VM_38_195_centos.log, screenshot1, screenshot2, 
> stdout
>
>
> Speculative execution is introduced in Flink 1.16 to deal with temporary slow 
> tasks caused by slow nodes. This feature currently consists of 4 FLIPs:
>  - FLIP-168: Speculative Execution core part
>  - FLIP-224: Blocklist Mechanism
>  - FLIP-245: Source Supports Speculative Execution
>  - FLIP-249: Flink Web UI Enhancement for Speculative Execution
> This ticket aims for verifying FLIP-168, along with FLIP-224 and FLIP-249.
> More details about this feature and how to use it can be found in this 
> [documentation|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/speculative_execution/].
> To do the verification, the process can be:
>  - Write a Flink job which has a subtask running much slower than others 
> (e.g. sleep indefinitely if it runs on a certain host, the hostname can be 
> retrieved via InetAddress.getLocalHost().getHostName(), or if its 
> (subtaskIndex + attemptNumer) % 2 == 0)
>  - Modify Flink configuration file to enable speculative execution and tune 
> the configuration as you like
>  - Submit the job. Checking the web UI, logs, metrics and produced result.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28980) Release Testing: Verify FLIP-168 speculative execution

2022-08-31 Thread Biao Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-28980:
-
Attachment: flink-root-taskexecutor-0-VM_199_24_centos.log

> Release Testing: Verify FLIP-168 speculative execution
> --
>
> Key: FLINK-28980
> URL: https://issues.apache.org/jira/browse/FLINK-28980
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhu Zhu
>Assignee: Biao Liu
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.16.0
>
> Attachments: flink-root-standalonesession-0-VM_38_195_centos.log, 
> flink-root-taskexecutor-0-VM_199_24_centos.log, 
> flink-root-taskexecutor-0-VM_38_195_centos.log, screenshot1, screenshot2, 
> stdout
>
>
> Speculative execution is introduced in Flink 1.16 to deal with temporary slow 
> tasks caused by slow nodes. This feature currently consists of 4 FLIPs:
>  - FLIP-168: Speculative Execution core part
>  - FLIP-224: Blocklist Mechanism
>  - FLIP-245: Source Supports Speculative Execution
>  - FLIP-249: Flink Web UI Enhancement for Speculative Execution
> This ticket aims for verifying FLIP-168, along with FLIP-224 and FLIP-249.
> More details about this feature and how to use it can be found in this 
> [documentation|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/speculative_execution/].
> To do the verification, the process can be:
>  - Write a Flink job which has a subtask running much slower than others 
> (e.g. sleep indefinitely if it runs on a certain host, the hostname can be 
> retrieved via InetAddress.getLocalHost().getHostName(), or if its 
> (subtaskIndex + attemptNumer) % 2 == 0)
>  - Modify Flink configuration file to enable speculative execution and tune 
> the configuration as you like
>  - Submit the job. Checking the web UI, logs, metrics and produced result.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28980) Release Testing: Verify FLIP-168 speculative execution

2022-08-31 Thread Biao Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-28980:
-
Attachment: screenshot1

> Release Testing: Verify FLIP-168 speculative execution
> --
>
> Key: FLINK-28980
> URL: https://issues.apache.org/jira/browse/FLINK-28980
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhu Zhu
>Assignee: Biao Liu
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.16.0
>
> Attachments: flink-root-standalonesession-0-VM_38_195_centos.log, 
> flink-root-taskexecutor-0-VM_199_24_centos.log, 
> flink-root-taskexecutor-0-VM_38_195_centos.log, screenshot1, screenshot2, 
> stdout
>
>
> Speculative execution is introduced in Flink 1.16 to deal with temporary slow 
> tasks caused by slow nodes. This feature currently consists of 4 FLIPs:
>  - FLIP-168: Speculative Execution core part
>  - FLIP-224: Blocklist Mechanism
>  - FLIP-245: Source Supports Speculative Execution
>  - FLIP-249: Flink Web UI Enhancement for Speculative Execution
> This ticket aims for verifying FLIP-168, along with FLIP-224 and FLIP-249.
> More details about this feature and how to use it can be found in this 
> [documentation|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/speculative_execution/].
> To do the verification, the process can be:
>  - Write a Flink job which has a subtask running much slower than others 
> (e.g. sleep indefinitely if it runs on a certain host, the hostname can be 
> retrieved via InetAddress.getLocalHost().getHostName(), or if its 
> (subtaskIndex + attemptNumer) % 2 == 0)
>  - Modify Flink configuration file to enable speculative execution and tune 
> the configuration as you like
>  - Submit the job. Checking the web UI, logs, metrics and produced result.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28980) Release Testing: Verify FLIP-168 speculative execution

2022-08-31 Thread Biao Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-28980:
-
Attachment: screenshot2

> Release Testing: Verify FLIP-168 speculative execution
> --
>
> Key: FLINK-28980
> URL: https://issues.apache.org/jira/browse/FLINK-28980
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhu Zhu
>Assignee: Biao Liu
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.16.0
>
> Attachments: flink-root-standalonesession-0-VM_38_195_centos.log, 
> flink-root-taskexecutor-0-VM_199_24_centos.log, 
> flink-root-taskexecutor-0-VM_38_195_centos.log, screenshot1, screenshot2, 
> stdout
>
>
> Speculative execution is introduced in Flink 1.16 to deal with temporary slow 
> tasks caused by slow nodes. This feature currently consists of 4 FLIPs:
>  - FLIP-168: Speculative Execution core part
>  - FLIP-224: Blocklist Mechanism
>  - FLIP-245: Source Supports Speculative Execution
>  - FLIP-249: Flink Web UI Enhancement for Speculative Execution
> This ticket aims for verifying FLIP-168, along with FLIP-224 and FLIP-249.
> More details about this feature and how to use it can be found in this 
> [documentation|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/speculative_execution/].
> To do the verification, the process can be:
>  - Write a Flink job which has a subtask running much slower than others 
> (e.g. sleep indefinitely if it runs on a certain host, the hostname can be 
> retrieved via InetAddress.getLocalHost().getHostName(), or if its 
> (subtaskIndex + attemptNumer) % 2 == 0)
>  - Modify Flink configuration file to enable speculative execution and tune 
> the configuration as you like
>  - Submit the job. Checking the web UI, logs, metrics and produced result.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28980) Release Testing: Verify FLIP-168 speculative execution

2022-08-31 Thread Biao Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-28980:
-
Attachment: flink-root-taskexecutor-0-VM_38_195_centos.log

> Release Testing: Verify FLIP-168 speculative execution
> --
>
> Key: FLINK-28980
> URL: https://issues.apache.org/jira/browse/FLINK-28980
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhu Zhu
>Assignee: Biao Liu
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.16.0
>
> Attachments: flink-root-standalonesession-0-VM_38_195_centos.log, 
> flink-root-taskexecutor-0-VM_199_24_centos.log, 
> flink-root-taskexecutor-0-VM_38_195_centos.log, screenshot1, screenshot2, 
> stdout
>
>
> Speculative execution is introduced in Flink 1.16 to deal with temporary slow 
> tasks caused by slow nodes. This feature currently consists of 4 FLIPs:
>  - FLIP-168: Speculative Execution core part
>  - FLIP-224: Blocklist Mechanism
>  - FLIP-245: Source Supports Speculative Execution
>  - FLIP-249: Flink Web UI Enhancement for Speculative Execution
> This ticket aims for verifying FLIP-168, along with FLIP-224 and FLIP-249.
> More details about this feature and how to use it can be found in this 
> [documentation|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/speculative_execution/].
> To do the verification, the process can be:
>  - Write a Flink job which has a subtask running much slower than others 
> (e.g. sleep indefinitely if it runs on a certain host, the hostname can be 
> retrieved via InetAddress.getLocalHost().getHostName(), or if its 
> (subtaskIndex + attemptNumer) % 2 == 0)
>  - Modify Flink configuration file to enable speculative execution and tune 
> the configuration as you like
>  - Submit the job. Checking the web UI, logs, metrics and produced result.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28980) Release Testing: Verify FLIP-168 speculative execution

2022-08-31 Thread Biao Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-28980:
-
Attachment: flink-root-standalonesession-0-VM_38_195_centos.log

> Release Testing: Verify FLIP-168 speculative execution
> --
>
> Key: FLINK-28980
> URL: https://issues.apache.org/jira/browse/FLINK-28980
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhu Zhu
>Assignee: Biao Liu
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.16.0
>
> Attachments: flink-root-standalonesession-0-VM_38_195_centos.log, 
> flink-root-taskexecutor-0-VM_199_24_centos.log, 
> flink-root-taskexecutor-0-VM_38_195_centos.log, screenshot1, screenshot2, 
> stdout
>
>
> Speculative execution is introduced in Flink 1.16 to deal with temporary slow 
> tasks caused by slow nodes. This feature currently consists of 4 FLIPs:
>  - FLIP-168: Speculative Execution core part
>  - FLIP-224: Blocklist Mechanism
>  - FLIP-245: Source Supports Speculative Execution
>  - FLIP-249: Flink Web UI Enhancement for Speculative Execution
> This ticket aims for verifying FLIP-168, along with FLIP-224 and FLIP-249.
> More details about this feature and how to use it can be found in this 
> [documentation|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/speculative_execution/].
> To do the verification, the process can be:
>  - Write a Flink job which has a subtask running much slower than others 
> (e.g. sleep indefinitely if it runs on a certain host, the hostname can be 
> retrieved via InetAddress.getLocalHost().getHostName(), or if its 
> (subtaskIndex + attemptNumer) % 2 == 0)
>  - Modify Flink configuration file to enable speculative execution and tune 
> the configuration as you like
>  - Submit the job. Checking the web UI, logs, metrics and produced result.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28980) Release Testing: Verify FLIP-168 speculative execution

2022-08-31 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17598422#comment-17598422
 ] 

Biao Liu commented on FLINK-28980:
--

I've tested the scenario, and it looks good to me.

I started two TMs in different machines. And each of them has two slots. I used 
"hostname checking" with "InetAddress.getLocalHost().getHostName()" to make one 
task much slower than others (there are three subtasks of this operator). I set 
the "slow-task-detector.execution-time.baseline-ratio" to 0.5. The speculative 
task is launched as expected. I checked the web UI, metrics, logs and produced 
result. Everything works fine. There are some screenshots and log files in 
attachments.

> Release Testing: Verify FLIP-168 speculative execution
> --
>
> Key: FLINK-28980
> URL: https://issues.apache.org/jira/browse/FLINK-28980
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhu Zhu
>Assignee: Biao Liu
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.16.0
>
>
> Speculative execution is introduced in Flink 1.16 to deal with temporary slow 
> tasks caused by slow nodes. This feature currently consists of 4 FLIPs:
>  - FLIP-168: Speculative Execution core part
>  - FLIP-224: Blocklist Mechanism
>  - FLIP-245: Source Supports Speculative Execution
>  - FLIP-249: Flink Web UI Enhancement for Speculative Execution
> This ticket aims for verifying FLIP-168, along with FLIP-224 and FLIP-249.
> More details about this feature and how to use it can be found in this 
> [documentation|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/speculative_execution/].
> To do the verification, the process can be:
>  - Write a Flink job which has a subtask running much slower than others 
> (e.g. sleep indefinitely if it runs on a certain host, the hostname can be 
> retrieved via InetAddress.getLocalHost().getHostName(), or if its 
> (subtaskIndex + attemptNumer) % 2 == 0)
>  - Modify Flink configuration file to enable speculative execution and tune 
> the configuration as you like
>  - Submit the job. Checking the web UI, logs, metrics and produced result.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28980) Release Testing: Verify FLIP-168 speculative execution

2022-08-30 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17597890#comment-17597890
 ] 

Biao Liu commented on FLINK-28980:
--

[~godfreyhe] Sorry for late response, I missed the message notification. I'm 
working on it. It would be finished in today or tomorrow.

> Release Testing: Verify FLIP-168 speculative execution
> --
>
> Key: FLINK-28980
> URL: https://issues.apache.org/jira/browse/FLINK-28980
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhu Zhu
>Assignee: Biao Liu
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.16.0
>
>
> Speculative execution is introduced in Flink 1.16 to deal with temporary slow 
> tasks caused by slow nodes. This feature currently consists of 4 FLIPs:
>  - FLIP-168: Speculative Execution core part
>  - FLIP-224: Blocklist Mechanism
>  - FLIP-245: Source Supports Speculative Execution
>  - FLIP-249: Flink Web UI Enhancement for Speculative Execution
> This ticket aims for verifying FLIP-168, along with FLIP-224 and FLIP-249.
> More details about this feature and how to use it can be found in this 
> [documentation|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/speculative_execution/].
> To do the verification, the process can be:
>  - Write a Flink job which has a subtask running much slower than others 
> (e.g. sleep indefinitely if it runs on a certain host, the hostname can be 
> retrieved via InetAddress.getLocalHost().getHostName(), or if its 
> (subtaskIndex + attemptNumer) % 2 == 0)
>  - Modify Flink configuration file to enable speculative execution and tune 
> the configuration as you like
>  - Submit the job. Checking the web UI, logs, metrics and produced result.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-17073) Slow checkpoint cleanup causing OOMs

2020-08-03 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17170015#comment-17170015
 ] 

Biao Liu commented on FLINK-17073:
--

[~echauchot], sorry for the late reply. Thanks for pushing this!

I'm OK with [~roman_khachatryan]'s plan. It's simpler to implement in some 
aspects indeed. In my plan, we have to consider how to avoid synchronous 
cleaning which you mentioned. Because in the near future, 
{{CheckpointCoordinator}} would be no big lock anymore. 

{quote}...we can drop new checkpoint requests when there are too many 
checkpoints to clean...{quote}
I think we should take care of the cleaning for both successful checkpoint and 
failed checkpoint. 

I have left some comments in the doc.

> Slow checkpoint cleanup causing OOMs
> 
>
> Key: FLINK-17073
> URL: https://issues.apache.org/jira/browse/FLINK-17073
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.7.3, 1.8.0, 1.9.0, 1.10.0, 1.11.0
>Reporter: Till Rohrmann
>Assignee: Etienne Chauchot
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>
> A user reported that he sees a decline in checkpoint cleanup speed when 
> upgrading from Flink 1.7.2 to 1.10.0. The result is that a lot of cleanup 
> tasks are waiting in the execution queue occupying memory. Ultimately, the JM 
> process dies with an OOM.
> Compared to Flink 1.7.2, we introduced a dedicated {{ioExecutor}} which is 
> used by the {{HighAvailabilityServices}} (FLINK-11851). Before, we use the 
> {{AkkaRpcService}} thread pool which was a {{ForkJoinPool}} with a max 
> parallelism of 64. Now it is a {{FixedThreadPool}} with as many threads as 
> CPU cores. This change might have caused the decline in completed checkpoint 
> discard throughput. This suspicion needs to be validated before trying to fix 
> it!
> [1] 
> https://lists.apache.org/thread.html/r390e5d775878918edca0b6c9f18de96f828c266a888e34ed30ce8494%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16945) Execute CheckpointFailureManager.FailJobCallback directly in main thread executor

2020-07-30 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17168439#comment-17168439
 ] 

Biao Liu commented on FLINK-16945:
--

[~dian.fu], yes, I'm working on this. It depends on some other issues which I'm 
preparing PRs.

> Execute CheckpointFailureManager.FailJobCallback directly in main thread 
> executor
> -
>
> Key: FLINK-16945
> URL: https://issues.apache.org/jira/browse/FLINK-16945
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.10.0
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Since we have put all non-IO operations of {{CheckpointCoordinator}} into 
> main thread executor, the {{CheckpointFailureManager.FailJobCallback}} could 
> be executed directly now. In this way execution graph would fail immediately 
> when {{CheckpointFailureManager}} invokes the callback. We could avoid the 
> inconsistent scenario of FLINK-13497.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17073) Slow checkpoint cleanup causing OOMs

2020-07-28 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17166244#comment-17166244
 ] 

Biao Liu commented on FLINK-17073:
--

BTW [~echauchot], before writing any codes, it would be great to write an 
implementation plan first. That's a better place to discuss implementation 
detail.
I heard some other guys are also interested in this issue. It would be helpful 
fo them to understand what is happening. Besides that, there would be some 
other PRs on {{CheckpointCoordinator}} at the same time. We have to make sure 
there would be no big conflict between these changes.

> Slow checkpoint cleanup causing OOMs
> 
>
> Key: FLINK-17073
> URL: https://issues.apache.org/jira/browse/FLINK-17073
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.7.3, 1.8.0, 1.9.0, 1.10.0, 1.11.0
>Reporter: Till Rohrmann
>Assignee: Etienne Chauchot
>Priority: Major
> Fix For: 1.12.0
>
>
> A user reported that he sees a decline in checkpoint cleanup speed when 
> upgrading from Flink 1.7.2 to 1.10.0. The result is that a lot of cleanup 
> tasks are waiting in the execution queue occupying memory. Ultimately, the JM 
> process dies with an OOM.
> Compared to Flink 1.7.2, we introduced a dedicated {{ioExecutor}} which is 
> used by the {{HighAvailabilityServices}} (FLINK-11851). Before, we use the 
> {{AkkaRpcService}} thread pool which was a {{ForkJoinPool}} with a max 
> parallelism of 64. Now it is a {{FixedThreadPool}} with as many threads as 
> CPU cores. This change might have caused the decline in completed checkpoint 
> discard throughput. This suspicion needs to be validated before trying to fix 
> it!
> [1] 
> https://lists.apache.org/thread.html/r390e5d775878918edca0b6c9f18de96f828c266a888e34ed30ce8494%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18641) "Failure to finalize checkpoint" error in MasterTriggerRestoreHook

2020-07-27 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17166057#comment-17166057
 ] 

Biao Liu commented on FLINK-18641:
--

[~becket_qin], [~pnowojski], the ticket has not been assigned yet. Is there 
anyone working on this?

> "Failure to finalize checkpoint" error in MasterTriggerRestoreHook
> --
>
> Key: FLINK-18641
> URL: https://issues.apache.org/jira/browse/FLINK-18641
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.0
>Reporter: Brian Zhou
>Priority: Major
>
> https://github.com/pravega/flink-connectors is a Pravega connector for Flink. 
> The ReaderCheckpointHook[1] class uses the Flink `MasterTriggerRestoreHook` 
> interface to trigger the Pravega checkpoint during Flink checkpoints to make 
> sure the data recovery. The checkpoint recovery tests are running fine in 
> Flink 1.10, but it has below issues in Flink 1.11 causing the tests time out. 
> Suspect it is related to the checkpoint coordinator thread model changes in 
> Flink 1.11
> Error stacktrace:
> {code}
> 2020-07-09 15:39:39,999 30945 [jobmanager-future-thread-5] WARN  
> o.a.f.runtime.jobmaster.JobMaster - Error while processing checkpoint 
> acknowledgement message
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize 
> the pending checkpoint 3. Failure reason: Failure to finalize checkpoint.
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1033)
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:948)
>  at 
> org.apache.flink.runtime.scheduler.SchedulerBase.lambda$acknowledgeCheckpoint$4(SchedulerBase.java:802)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.SerializedThrowable: Pending checkpoint has 
> not been fully acknowledged yet
>  at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>  at 
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:298)
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1021)
>  ... 9 common frames omitted
> {code}
> More detail in this mailing thread: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Pravega-connector-cannot-recover-from-the-checkpoint-due-to-quot-Failure-to-finalize-checkpoint-quot-td36652.html
> Also in https://github.com/pravega/flink-connectors/issues/387



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17073) Slow checkpoint cleanup causing OOMs

2020-07-27 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17165518#comment-17165518
 ] 

Biao Liu commented on FLINK-17073:
--

To [~roman_khachatryan], thanks for nice suggestions!

{quote}I think an alternative (or complementary) temporary solution is to use a 
bounded queue when creating ioExecutor.{quote}
I'm not a fan of this temporary solution. We have to consider how to treat the 
invoker which launches asynchronous IO operations through {{ioExecutor}} if the 
queue is full. Make them failed or wait till there is some space available? I'm 
afraid it's not a small work to review all the places calls {{ioExecutor}}. If 
we want a temporary solution, maybe we could just increase the thread count. 

Regarding to the long-term solution. Actually Etienne and me have not discuss 
many of the implementation details. I just gave some suggestions to make sure 
it's in the right direction. It's cool to have your detailed suggestions. It 
may help a lot for the contributor who is not familiar with this part. I just 
thought we don't have to discuss too much details here. It might be better to 
give contributor more free space. We could pay more attention on code review to 
guarantee it's correct and reasonable.

BTW, just a tiny suggestion, code refactoring is not necessary, we should focus 
on solving the issue first. After that, we could consider if we could do some 
refactoring to make the codes more readable or elegant. 

To [~echauchot], besides the implementation, is there any question about the 
plan? Please feel free to ask anything that you don't understand. 

> Slow checkpoint cleanup causing OOMs
> 
>
> Key: FLINK-17073
> URL: https://issues.apache.org/jira/browse/FLINK-17073
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.7.3, 1.8.0, 1.9.0, 1.10.0, 1.11.0
>Reporter: Till Rohrmann
>Assignee: Etienne Chauchot
>Priority: Major
> Fix For: 1.12.0
>
>
> A user reported that he sees a decline in checkpoint cleanup speed when 
> upgrading from Flink 1.7.2 to 1.10.0. The result is that a lot of cleanup 
> tasks are waiting in the execution queue occupying memory. Ultimately, the JM 
> process dies with an OOM.
> Compared to Flink 1.7.2, we introduced a dedicated {{ioExecutor}} which is 
> used by the {{HighAvailabilityServices}} (FLINK-11851). Before, we use the 
> {{AkkaRpcService}} thread pool which was a {{ForkJoinPool}} with a max 
> parallelism of 64. Now it is a {{FixedThreadPool}} with as many threads as 
> CPU cores. This change might have caused the decline in completed checkpoint 
> discard throughput. This suspicion needs to be validated before trying to fix 
> it!
> [1] 
> https://lists.apache.org/thread.html/r390e5d775878918edca0b6c9f18de96f828c266a888e34ed30ce8494%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18641) "Failure to finalize checkpoint" error in MasterTriggerRestoreHook

2020-07-23 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163341#comment-17163341
 ] 

Biao Liu commented on FLINK-18641:
--

Ah, sorry [~pnowojski], I forgot this change is not submitted to release-1.10 
at that time, it's only submitted to master.

[~becket_qin], thanks for correcting it. There are two different things of your 
response, master hook and {{OperatorCoordinator}}. Regarding to this ticket, 
it's not caused by the {{OperatorCoordinator}} part, right? I think we could 
file another ticket for it and discuss it there.

{quote}By design, the checkpoint should always actually take the snapshot of 
the master hooks and OperatorCoordinator first before taking the checkpoint on 
the tasks.{quote}
Technically speaking, there is no clear semantics that master hook should be 
taken before task snapshotting. For the {{ExternallyInducedSource}}, the task 
snapshotting might be taken before master hook finishes the future returned. 
And if there are multiple master hooks, some hooks might be invoked after task 
snapshotting. It's concurrent somewhat. I don't think we should/could guarantee 
the ordering here.

Anyway we have to fix the issue of {{ExternallyInducedSource}} caused by the 
ordering.
Regarding to the fixing plan. I'm not sure how heavy the fixing of 
{{OperatorCoordinator}} might be. If it's not a simple fixing, it might be 
better to separate these things into different patches. As a quick fixing of 
this ticket, we could take the master hook synchronously with the 
coordinator-wide lock retaining just like before.

{quote}I agree that In long run, the operator coordinator can actually 
supersede the master hooks. So we can probably mark the master hooks as 
deprecated.{quote}
Totally agree!

> "Failure to finalize checkpoint" error in MasterTriggerRestoreHook
> --
>
> Key: FLINK-18641
> URL: https://issues.apache.org/jira/browse/FLINK-18641
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.0
>Reporter: Brian Zhou
>Priority: Major
>
> https://github.com/pravega/flink-connectors is a Pravega connector for Flink. 
> The ReaderCheckpointHook[1] class uses the Flink `MasterTriggerRestoreHook` 
> interface to trigger the Pravega checkpoint during Flink checkpoints to make 
> sure the data recovery. The checkpoint recovery tests are running fine in 
> Flink 1.10, but it has below issues in Flink 1.11 causing the tests time out. 
> Suspect it is related to the checkpoint coordinator thread model changes in 
> Flink 1.11
> Error stacktrace:
> {code}
> 2020-07-09 15:39:39,999 30945 [jobmanager-future-thread-5] WARN  
> o.a.f.runtime.jobmaster.JobMaster - Error while processing checkpoint 
> acknowledgement message
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize 
> the pending checkpoint 3. Failure reason: Failure to finalize checkpoint.
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1033)
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:948)
>  at 
> org.apache.flink.runtime.scheduler.SchedulerBase.lambda$acknowledgeCheckpoint$4(SchedulerBase.java:802)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.SerializedThrowable: Pending checkpoint has 
> not been fully acknowledged yet
>  at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>  at 
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:298)
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1021)
>  ... 9 common frames omitted
> {code}
> More detail in this mailing thread: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Pravega-connector-cannot-recover-from-the-checkpoint-due-to-quot-Failure-to-finalize-checkpoint-quot-td36652.html
> Also in https://github.com/pravega/flink-connectors/issues/387



--

[jira] [Commented] (FLINK-18641) "Failure to finalize checkpoint" error in MasterTriggerRestoreHook

2020-07-22 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17162553#comment-17162553
 ] 

Biao Liu commented on FLINK-18641:
--

Thanks [~becket_qin] for analyzing this issue. The asynchronous checkpoint 
threading model breaks the assumption of {{ExternallyInducedSource}} could 
trigger a checkpoint before {{MasterTriggerRestoreHook}} finishes the trigger 
future. We can find a way to guarantee that. However it's not friendly to the 
scenario that doing some initialization or preparation in 
{{MasterTriggerRestoreHook}}. Because checkpoint might be triggered before 
initialization of preparation finishes. Hope nobody uses it like that :(

Currently the semantics of {{ExternallyInducedSource}} is highly bound with the 
implementation of Flink checkpoint which should be avoided IMO. I think we 
should redesign the {{ExternallyInducedSource}} as a long-term goal.

To [~becket_qin], do you already have any idea for fixing it? If not, I could 
help to fix it.

BTW, this change of {{CheckpointCoordinator}} is introduced in 1.10. Is it 
possible that the failure of testing case is exposed by the change of 
{{OperatorCoordinator}}? Because we add another asynchronous step between 
master hook triggering and task triggering. I'm not sure if there must be some 
{{OperatorCoordinator}} added or not in the scenario of Pravega connector 
testing. If not, there is a work-around way that try to finish future returned 
by {{MasterTriggerRestoreHook.triggerCheckpoint}} before trigger task 
checkpoint (I assume there is only one master hook in the case). 

> "Failure to finalize checkpoint" error in MasterTriggerRestoreHook
> --
>
> Key: FLINK-18641
> URL: https://issues.apache.org/jira/browse/FLINK-18641
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.0
>Reporter: Brian Zhou
>Priority: Major
>
> https://github.com/pravega/flink-connectors is a Pravega connector for Flink. 
> The ReaderCheckpointHook[1] class uses the Flink `MasterTriggerRestoreHook` 
> interface to trigger the Pravega checkpoint during Flink checkpoints to make 
> sure the data recovery. The checkpoint recovery tests are running fine in 
> Flink 1.10, but it has below issues in Flink 1.11 causing the tests time out. 
> Suspect it is related to the checkpoint coordinator thread model changes in 
> Flink 1.11
> Error stacktrace:
> {code}
> 2020-07-09 15:39:39,999 30945 [jobmanager-future-thread-5] WARN  
> o.a.f.runtime.jobmaster.JobMaster - Error while processing checkpoint 
> acknowledgement message
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize 
> the pending checkpoint 3. Failure reason: Failure to finalize checkpoint.
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1033)
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:948)
>  at 
> org.apache.flink.runtime.scheduler.SchedulerBase.lambda$acknowledgeCheckpoint$4(SchedulerBase.java:802)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.SerializedThrowable: Pending checkpoint has 
> not been fully acknowledged yet
>  at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>  at 
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:298)
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1021)
>  ... 9 common frames omitted
> {code}
> More detail in this mailing thread: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Pravega-connector-cannot-recover-from-the-checkpoint-due-to-quot-Failure-to-finalize-checkpoint-quot-td36652.html
> Also in https://github.com/pravega/flink-connectors/issues/387



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17073) Slow checkpoint cleanup causing OOMs

2020-07-20 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17161657#comment-17161657
 ] 

Biao Liu commented on FLINK-17073:
--

[~echauchot], sorry I don't have the authorization of issue assignment. 
[~pnowojski], could you help to assign the ticket to him?

> Slow checkpoint cleanup causing OOMs
> 
>
> Key: FLINK-17073
> URL: https://issues.apache.org/jira/browse/FLINK-17073
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.7.3, 1.8.0, 1.9.0, 1.10.0, 1.11.0
>Reporter: Till Rohrmann
>Priority: Major
> Fix For: 1.12.0
>
>
> A user reported that he sees a decline in checkpoint cleanup speed when 
> upgrading from Flink 1.7.2 to 1.10.0. The result is that a lot of cleanup 
> tasks are waiting in the execution queue occupying memory. Ultimately, the JM 
> process dies with an OOM.
> Compared to Flink 1.7.2, we introduced a dedicated {{ioExecutor}} which is 
> used by the {{HighAvailabilityServices}} (FLINK-11851). Before, we use the 
> {{AkkaRpcService}} thread pool which was a {{ForkJoinPool}} with a max 
> parallelism of 64. Now it is a {{FixedThreadPool}} with as many threads as 
> CPU cores. This change might have caused the decline in completed checkpoint 
> discard throughput. This suspicion needs to be validated before trying to fix 
> it!
> [1] 
> https://lists.apache.org/thread.html/r390e5d775878918edca0b6c9f18de96f828c266a888e34ed30ce8494%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17073) Slow checkpoint cleanup causing OOMs

2020-07-05 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17151618#comment-17151618
 ] 

Biao Liu commented on FLINK-17073:
--

Hi [~echauchot], thanks for doing so much. I left a couple of comments in 
design doc. The second proposal seems to be a reliable and light solution :)

> Slow checkpoint cleanup causing OOMs
> 
>
> Key: FLINK-17073
> URL: https://issues.apache.org/jira/browse/FLINK-17073
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.7.3, 1.8.0, 1.9.0, 1.10.0, 1.11.0
>Reporter: Till Rohrmann
>Priority: Major
> Fix For: 1.12.0
>
>
> A user reported that he sees a decline in checkpoint cleanup speed when 
> upgrading from Flink 1.7.2 to 1.10.0. The result is that a lot of cleanup 
> tasks are waiting in the execution queue occupying memory. Ultimately, the JM 
> process dies with an OOM.
> Compared to Flink 1.7.2, we introduced a dedicated {{ioExecutor}} which is 
> used by the {{HighAvailabilityServices}} (FLINK-11851). Before, we use the 
> {{AkkaRpcService}} thread pool which was a {{ForkJoinPool}} with a max 
> parallelism of 64. Now it is a {{FixedThreadPool}} with as many threads as 
> CPU cores. This change might have caused the decline in completed checkpoint 
> discard throughput. This suspicion needs to be validated before trying to fix 
> it!
> [1] 
> https://lists.apache.org/thread.html/r390e5d775878918edca0b6c9f18de96f828c266a888e34ed30ce8494%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18137) JobMasterTriggerSavepointITCase.testStopJobAfterSavepoint fails with AskTimeoutException

2020-06-11 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1717#comment-1717
 ] 

Biao Liu commented on FLINK-18137:
--

I just saw this issue. I think [~trohrmann] is right.
There is a problem of if/else in 
[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L547].
 The {{throwable}} passed to {{onTriggerFailure}} can be null unexpectedly. 
Actually that's my fault, this code is written by me and I realized it some 
days ago. I was planning to fix it in the later PR because I checked it at that 
time that it can't raise NPE, so I thought it's not emergency. However 
FLINK-16770 breaks the plan, I reverted a lot of codes and forgot to fix this 
potential issue separately. Unfortunately 
https://github.com/apache/flink/commit/1af33f1285d557f0171f4587d7f4e789df27e7cb 
hits this NPE. {{onTriggerFailure}} shouldn't throw any exception by design.
The codes changed a bit from my last commit. I need to double check the comment 
mentioned by [~roman_khachatryan] to make sure there is no other issue.


> JobMasterTriggerSavepointITCase.testStopJobAfterSavepoint fails with 
> AskTimeoutException
> 
>
> Key: FLINK-18137
> URL: https://issues.apache.org/jira/browse/FLINK-18137
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / Coordination, Runtime 
> / Task, Tests
>Affects Versions: 1.11.0, 1.12.0
>Reporter: Robert Metzger
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.11.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=2747&view=logs&j=5c8e7682-d68f-54d1-16a2-a09310218a49&t=45cc9205-bdb7-5b54-63cd-89fdc0983323
> {code}
> 2020-06-04T16:17:20.4404189Z [ERROR] Tests run: 4, Failures: 0, Errors: 1, 
> Skipped: 0, Time elapsed: 14.352 s <<< FAILURE! - in 
> org.apache.flink.runtime.jobmaster.JobMasterTriggerSavepointITCase
> 2020-06-04T16:17:20.4405548Z [ERROR] 
> testStopJobAfterSavepoint(org.apache.flink.runtime.jobmaster.JobMasterTriggerSavepointITCase)
>   Time elapsed: 10.058 s  <<< ERROR!
> 2020-06-04T16:17:20.4407342Z java.util.concurrent.ExecutionException: 
> java.util.concurrent.TimeoutException: Invocation of public default 
> java.util.concurrent.CompletableFuture 
> org.apache.flink.runtime.webmonitor.RestfulGateway.triggerSavepoint(org.apache.flink.api.common.JobID,java.lang.String,boolean,org.apache.flink.api.common.time.Time)
>  timed out.
> 2020-06-04T16:17:20.4409562Z  at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> 2020-06-04T16:17:20.4410333Z  at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> 2020-06-04T16:17:20.4411259Z  at 
> org.apache.flink.runtime.jobmaster.JobMasterTriggerSavepointITCase.cancelWithSavepoint(JobMasterTriggerSavepointITCase.java:264)
> 2020-06-04T16:17:20.4412292Z  at 
> org.apache.flink.runtime.jobmaster.JobMasterTriggerSavepointITCase.testStopJobAfterSavepoint(JobMasterTriggerSavepointITCase.java:127)
> 2020-06-04T16:17:20.4413163Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2020-06-04T16:17:20.4413990Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2020-06-04T16:17:20.4414783Z  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2020-06-04T16:17:20.4415936Z  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2020-06-04T16:17:20.4416693Z  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 2020-06-04T16:17:20.4417632Z  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2020-06-04T16:17:20.4418637Z  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 2020-06-04T16:17:20.4419367Z  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2020-06-04T16:17:20.4420118Z  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> 2020-06-04T16:17:20.4420742Z  at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> 2020-06-04T16:17:20.4421909Z  at 
> org.junit.rules.RunRules.evaluate(RunRules.java:20)
> 2020-06-04T16:17:20.4422493Z  at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> 2020-06-04T16:17:20.4423247Z  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> 2020-06-04T16:17:20.4424263Z  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> 2020-06-04T16:17:20.4424876Z  at 
> org.junit.runners.P

[jira] [Commented] (FLINK-14971) Make all the non-IO operations in CheckpointCoordinator single-threaded

2020-06-09 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17129274#comment-17129274
 ] 

Biao Liu commented on FLINK-14971:
--

I spent a bit time to recall the summary of Stephan.

The first step "(1) When the checkpoint is ready (all tasks acked, metadata 
written out), Checkpoint Coordinator transfers ownership to the 
CompletedCheckpointStore" is a good idea for me. When checkpoint is ready, it 
can't be cancelled. Otherwise we need to think about how to revert 
{{CompletedCheckpointStore}}. It simplifies the scenario a lot.

Let's focus on the second step. If I understand correctly, the option (b) is a 
bit subtle. If JM recovers from a checkpoint (N) which is not persist to ZK, 
and then the JM process is gone, the new JM would recover from checkpoint N-1. 
I'm not sure there is no side-effect at all of both JM and TM side. But my gut 
feeling is that it might be a dangerous semantic. It might break assumption of 
some users.

The option (a) is the most feasible one for me. There are some facts behind 
this solution, please correct me if I'm wrong.
 1. The asynchronous committing of {{CompletedCheckpointStore}} must be done 
first, then {{CheckpointCoordinator}} notifies tasks that the checkpoint is 
completed. Otherwise the rule "NOTE: It is not fine to ignore it and start from 
an earlier checkpoint if it will get committed later. That is the bug to 
prevent" might be broken. The corner case is like, when 
{{CheckpointCoordinator}} notifies tasks that checkpoint N is completed first, 
then commit to ZK asynchronously(not successful yet), the JM process is gone. A 
new JM process starts, it would recover from checkpoint N-1, because N-1 is the 
last successful checkpoint recorded in ZK.
 2. If job fails before asynchronous committing completes, 
{{CheckpointCoordinator}} needs to decide how to handle this committing. When 
committing completes, JM might be stuck in restoring or other steps (like 
cancelling tasks). I see two options. Option A is failing this checkpoint, 
revert {{CheckpointCoordinator}} and do not do not subsume older checkpoints 
(which is described in FLINK-16770). Option B is treating this checkpoint as a 
successful one but do not notify tasks, because tasks are cancelling or waiting 
to be restarted, it's meaningless. I think option B is simpler and better and 
also acceptable because the notification of checkpoint completing is not 
guaranteed anyway.

> Make all the non-IO operations in CheckpointCoordinator single-threaded
> ---
>
> Key: FLINK-14971
> URL: https://issues.apache.org/jira/browse/FLINK-14971
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Currently the ACK and declined message handling are executed in IO thread. 
> This is the only rest part that non-IO operations are executed in IO thread. 
> It blocks introducing main thread executor for {{CheckpointCoordinator}}. It 
> would be resolved in this task.
> After resolving the ACK and declined message issue, the main thread executor 
> would be introduced into {{CheckpointCoordinator}} to instead of timer 
> thread. However the timer thread would be kept (maybe for a while 
> temporarily) to schedule periodic triggering, since FLINK-13848 is not 
> accepted yet.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-9900) Fix unstable test ZooKeeperHighAvailabilityITCase#testRestoreBehaviourWithFaultyStateHandles

2020-05-06 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-9900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17101335#comment-17101335
 ] 

Biao Liu commented on FLINK-9900:
-

Thanks [~rmetzger] for reporting.
This time, the case failed to submit job to cluster. The cluster didn't start 
the job within 10 seconds, so timeout happened. It's hard to say which step it 
got stuck in. The last log of {{JobMaster}} is "Configuring application-defined 
state backend with job/cluster config". I have attached the relevant log 
(mvn-2.log).
[~trohrmann] do you have any idea?

> Fix unstable test 
> ZooKeeperHighAvailabilityITCase#testRestoreBehaviourWithFaultyStateHandles
> 
>
> Key: FLINK-9900
> URL: https://issues.apache.org/jira/browse/FLINK-9900
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.5.1, 1.6.0, 1.9.0
>Reporter: zhangminglei
>Assignee: Biao Liu
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.9.1, 1.10.0
>
> Attachments: mvn-2.log
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> https://api.travis-ci.org/v3/job/405843617/log.txt
> Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 124.598 sec 
> <<< FAILURE! - in 
> org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase
>  
> testRestoreBehaviourWithFaultyStateHandles(org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase)
>  Time elapsed: 120.036 sec <<< ERROR!
>  org.junit.runners.model.TestTimedOutException: test timed out after 12 
> milliseconds
>  at sun.misc.Unsafe.park(Native Method)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
>  at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>  at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
>  at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>  at 
> org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase.testRestoreBehaviourWithFaultyStateHandles(ZooKeeperHighAvailabilityITCase.java:244)
> Results :
> Tests in error: 
>  
> ZooKeeperHighAvailabilityITCase.testRestoreBehaviourWithFaultyStateHandles:244
>  » TestTimedOut
> Tests run: 1453, Failures: 0, Errors: 1, Skipped: 29



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-9900) Fix unstable test ZooKeeperHighAvailabilityITCase#testRestoreBehaviourWithFaultyStateHandles

2020-05-06 Thread Biao Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-9900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-9900:

Attachment: mvn-2.log

> Fix unstable test 
> ZooKeeperHighAvailabilityITCase#testRestoreBehaviourWithFaultyStateHandles
> 
>
> Key: FLINK-9900
> URL: https://issues.apache.org/jira/browse/FLINK-9900
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.5.1, 1.6.0, 1.9.0
>Reporter: zhangminglei
>Assignee: Biao Liu
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.9.1, 1.10.0
>
> Attachments: mvn-2.log
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> https://api.travis-ci.org/v3/job/405843617/log.txt
> Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 124.598 sec 
> <<< FAILURE! - in 
> org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase
>  
> testRestoreBehaviourWithFaultyStateHandles(org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase)
>  Time elapsed: 120.036 sec <<< ERROR!
>  org.junit.runners.model.TestTimedOutException: test timed out after 12 
> milliseconds
>  at sun.misc.Unsafe.park(Native Method)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
>  at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>  at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
>  at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>  at 
> org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase.testRestoreBehaviourWithFaultyStateHandles(ZooKeeperHighAvailabilityITCase.java:244)
> Results :
> Tests in error: 
>  
> ZooKeeperHighAvailabilityITCase.testRestoreBehaviourWithFaultyStateHandles:244
>  » TestTimedOut
> Tests run: 1453, Failures: 0, Errors: 1, Skipped: 29



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file

2020-04-25 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17092098#comment-17092098
 ] 

Biao Liu edited comment on FLINK-16770 at 4/25/20, 7:12 AM:


Technically speaking, the scenario we discussed here should not happen with the 
reverted commits. The finalization of checkpoint is reverted to be executed 
synchronously and wrapped in the coordinator-wide lock. There shouldn't be race 
condition at all. On the other hand, the earlier commits of the refactoring are 
merged over 3 months ago. So to answer the question of [~pnowojski], I think we 
have reverted enough commits.

I have noticed that there are some logs:
{quote}kill: usage: kill [-s sigspec | -n signum | -sigspec] pid | jobspec ... 
or kill -l [sigspec]
 Killed TM @
{quote}
It seems that there is no TM process at some time. I guess it's not a normal 
scenario. The {{ha_tm_watchdog}} in common_ha.sh should start a new TM before 
killing an old one in this case. What if there is no TM process at all? Exited 
or killed unexpectedly? I'm not sure. I think there will be no enough TM to 
finish the testing case. Because the {{ha_tm_watchdog}} only starts a new TM if 
there are enough TMs,
{quote}local MISSING_TMS=$((EXPECTED_TMS-RUNNING_TMS))
 if [ ${MISSING_TMS} -eq 0 ]; then

    start a new TM only if we have exactly the expected number
    "$FLINK_DIR"/bin/taskmanager.sh start > /dev/null
 fi
{quote}
I guess the failure cause is another one, maybe it's relevant to the "no TM 
process". But I can't tell what really happened in this case without any other 
logs. Is there any way we could find the JM logs? [~rmetzger]


was (Author: sleepy):
Technically speaking, the scenario we discussed here should not happen with the 
reverted codes. The finalization of checkpoint is reverted to be executed 
synchronously and wrapped in the coordinator-wide lock. There shouldn't be race 
condition at all. On the other hand, the earlier commits of the refactoring are 
merged over 3 months ago. So to answer the question of [~pnowojski], I think we 
have reverted enough commits.

I have noticed that there are some logs:
{quote}kill: usage: kill [-s sigspec | -n signum | -sigspec] pid | jobspec ... 
or kill -l [sigspec]
 Killed TM @
{quote}
It seems that there is no TM process at some time. I guess it's not a normal 
scenario. The {{ha_tm_watchdog}} in common_ha.sh should start a new TM before 
killing an old one in this case. What if there is no TM process at all? Exited 
or killed unexpectedly? I'm not sure. I think there will be no enough TM to 
finish the testing case. Because the {{ha_tm_watchdog}} only starts a new TM if 
there are enough TMs,
{quote}local MISSING_TMS=$((EXPECTED_TMS-RUNNING_TMS))
 if [ ${MISSING_TMS} -eq 0 ]; then
 # start a new TM only if we have exactly the expected number
 "$FLINK_DIR"/bin/taskmanager.sh start > /dev/null
 fi{quote}
I guess the failure cause is another one, maybe it's relevant to the "no TM 
process". But I can't tell what really happened in this case without any other 
logs. Is there any way we could find the JM logs? [~rmetzger]

> Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end 
> test fails with no such file
> ---
>
> Key: FLINK-16770
> URL: https://issues.apache.org/jira/browse/FLINK-16770
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Tests
>Affects Versions: 1.11.0
>Reporter: Zhijiang
>Assignee: Yun Tang
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.11.0
>
> Attachments: e2e-output.log, 
> flink-vsts-standalonesession-0-fv-az53.log, image-2020-04-16-11-24-54-549.png
>
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> The log : 
> [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6603&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5]
>  
> There was also the similar problem in 
> https://issues.apache.org/jira/browse/FLINK-16561, but for the case of no 
> parallelism change. And this case is for scaling up. Not quite sure whether 
> the root cause is the same one.
> {code:java}
> 2020-03-25T06:50:31.3894841Z Running 'Resuming Externalized Checkpoint 
> (rocks, incremental, scale up) end-to-end test'
> 2020-03-25T06:50:31.3895308Z 
> ==
> 2020-03-25T06:50:31.3907274Z TEST_DATA_DIR: 
> /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304
> 2020-03-25T06:50:31.5500274Z Flink dist directory: 
> /home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT
> 2020-03-25T06:50:

[jira] [Commented] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file

2020-04-25 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17092098#comment-17092098
 ] 

Biao Liu commented on FLINK-16770:
--

Technically speaking, the scenario we discussed here should not happen with the 
reverted codes. The finalization of checkpoint is reverted to be executed 
synchronously and wrapped in the coordinator-wide lock. There shouldn't be race 
condition at all. On the other hand, the earlier commits of the refactoring are 
merged over 3 months ago. So to answer the question of [~pnowojski], I think we 
have reverted enough commits.

I have noticed that there are some logs:
{quote}kill: usage: kill [-s sigspec | -n signum | -sigspec] pid | jobspec ... 
or kill -l [sigspec]
 Killed TM @
{quote}
It seems that there is no TM process at some time. I guess it's not a normal 
scenario. The {{ha_tm_watchdog}} in common_ha.sh should start a new TM before 
killing an old one in this case. What if there is no TM process at all? Exited 
or killed unexpectedly? I'm not sure. I think there will be no enough TM to 
finish the testing case. Because the {{ha_tm_watchdog}} only starts a new TM if 
there are enough TMs,
{quote}local MISSING_TMS=$((EXPECTED_TMS-RUNNING_TMS))
 if [ ${MISSING_TMS} -eq 0 ]; then
 # start a new TM only if we have exactly the expected number
 "$FLINK_DIR"/bin/taskmanager.sh start > /dev/null
 fi{quote}
I guess the failure cause is another one, maybe it's relevant to the "no TM 
process". But I can't tell what really happened in this case without any other 
logs. Is there any way we could find the JM logs? [~rmetzger]

> Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end 
> test fails with no such file
> ---
>
> Key: FLINK-16770
> URL: https://issues.apache.org/jira/browse/FLINK-16770
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Tests
>Affects Versions: 1.11.0
>Reporter: Zhijiang
>Assignee: Yun Tang
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.11.0
>
> Attachments: e2e-output.log, 
> flink-vsts-standalonesession-0-fv-az53.log, image-2020-04-16-11-24-54-549.png
>
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> The log : 
> [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6603&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5]
>  
> There was also the similar problem in 
> https://issues.apache.org/jira/browse/FLINK-16561, but for the case of no 
> parallelism change. And this case is for scaling up. Not quite sure whether 
> the root cause is the same one.
> {code:java}
> 2020-03-25T06:50:31.3894841Z Running 'Resuming Externalized Checkpoint 
> (rocks, incremental, scale up) end-to-end test'
> 2020-03-25T06:50:31.3895308Z 
> ==
> 2020-03-25T06:50:31.3907274Z TEST_DATA_DIR: 
> /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304
> 2020-03-25T06:50:31.5500274Z Flink dist directory: 
> /home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT
> 2020-03-25T06:50:31.6354639Z Starting cluster.
> 2020-03-25T06:50:31.8871932Z Starting standalonesession daemon on host 
> fv-az655.
> 2020-03-25T06:50:33.5021784Z Starting taskexecutor daemon on host fv-az655.
> 2020-03-25T06:50:33.5152274Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:34.5498116Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:35.6031346Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:36.9848425Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:38.0283377Z Dispatcher REST endpoint is up.
> 2020-03-25T06:50:38.0285490Z Running externalized checkpoints test, with 
> ORIGINAL_DOP=2 NEW_DOP=4 and STATE_BACKEND_TYPE=rocks 
> STATE_BACKEND_FILE_ASYNC=true STATE_BACKEND_ROCKSDB_INCREMENTAL=true 
> SIMULATE_FAILURE=false ...
> 2020-03-25T06:50:46.1754645Z Job (b8cb04e4b1e730585bc616aa352866d0) is 
> running.
> 2020-03-25T06:50:46.1758132Z Waiting for job 
> (b8cb04e4b1e730585bc616aa352866d0) to have at least 1 completed checkpoints 
> ...
> 2020-03-25T06:50:46.3478276Z Waiting for job to process up to 200 records, 
> current progress: 173 records ...
> 2020-03-25T06:50:49.6332988Z Cancelling job b8cb04e4b1e730585bc616aa352866d0.
> 2020-03-25T06:50:50.4875673Z Cancelled job b8cb04e4b1e730585bc616aa352866d0.
> 2020-03-25T06:50:50.5468230Z ls: cannot access 
> '/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304/externalized-chckpt-e2e-backend-dir/b8cb04e4b1e730585bc616aa352

[jira] [Commented] (FLINK-16931) Large _metadata file lead to JobManager not responding when restart

2020-04-17 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085579#comment-17085579
 ] 

Biao Liu commented on FLINK-16931:
--

Hi [~qqibrow], thanks for opening the PR. I'll try to find some time next week 
to take a look. Too many things this week, sadly :(

> Large _metadata file lead to JobManager not responding when restart
> ---
>
> Key: FLINK-16931
> URL: https://issues.apache.org/jira/browse/FLINK-16931
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / Coordination
>Affects Versions: 1.9.2, 1.10.0, 1.11.0
>Reporter: Lu Niu
>Assignee: Lu Niu
>Priority: Critical
> Fix For: 1.11.0
>
>
> When _metadata file is big, JobManager could never recover from checkpoint. 
> It fall into a loop that fetch checkpoint -> JM timeout -> restart. Here is 
> related log: 
> {code:java}
>  2020-04-01 17:08:25,689 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - 
> Recovering checkpoints from ZooKeeper.
>  2020-04-01 17:08:25,698 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Found 
> 3 checkpoints in ZooKeeper.
>  2020-04-01 17:08:25,698 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - 
> Trying to fetch 3 checkpoints from storage.
>  2020-04-01 17:08:25,698 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - 
> Trying to retrieve checkpoint 50.
>  2020-04-01 17:08:48,589 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - 
> Trying to retrieve checkpoint 51.
>  2020-04-01 17:09:12,775 INFO org.apache.flink.yarn.YarnResourceManager - The 
> heartbeat of JobManager with id 02500708baf0bb976891c391afd3d7d5 timed out.
> {code}
> Digging into the code, looks like ExecutionGraph::restart runs in JobMaster 
> main thread and finally calls 
> ZooKeeperCompletedCheckpointStore::retrieveCompletedCheckpoint which download 
> file form DFS. The main thread is basically blocked for a while because of 
> this. One possible solution is to making the downloading part async. More 
> things might need to consider as the original change tries to make it 
> single-threaded. [https://github.com/apache/flink/pull/7568]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file

2020-04-14 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17083325#comment-17083325
 ] 

Biao Liu commented on FLINK-16770:
--

Thanks [~rmetzger] for reminding.
[~yunta] good job, please give me a feedback if you need any help.

> Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end 
> test fails with no such file
> ---
>
> Key: FLINK-16770
> URL: https://issues.apache.org/jira/browse/FLINK-16770
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Tests
>Affects Versions: 1.11.0
>Reporter: Zhijiang
>Assignee: Yun Tang
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.11.0
>
> Attachments: e2e-output.log, 
> flink-vsts-standalonesession-0-fv-az53.log
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The log : 
> [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6603&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5]
>  
> There was also the similar problem in 
> https://issues.apache.org/jira/browse/FLINK-16561, but for the case of no 
> parallelism change. And this case is for scaling up. Not quite sure whether 
> the root cause is the same one.
> {code:java}
> 2020-03-25T06:50:31.3894841Z Running 'Resuming Externalized Checkpoint 
> (rocks, incremental, scale up) end-to-end test'
> 2020-03-25T06:50:31.3895308Z 
> ==
> 2020-03-25T06:50:31.3907274Z TEST_DATA_DIR: 
> /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304
> 2020-03-25T06:50:31.5500274Z Flink dist directory: 
> /home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT
> 2020-03-25T06:50:31.6354639Z Starting cluster.
> 2020-03-25T06:50:31.8871932Z Starting standalonesession daemon on host 
> fv-az655.
> 2020-03-25T06:50:33.5021784Z Starting taskexecutor daemon on host fv-az655.
> 2020-03-25T06:50:33.5152274Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:34.5498116Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:35.6031346Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:36.9848425Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:38.0283377Z Dispatcher REST endpoint is up.
> 2020-03-25T06:50:38.0285490Z Running externalized checkpoints test, with 
> ORIGINAL_DOP=2 NEW_DOP=4 and STATE_BACKEND_TYPE=rocks 
> STATE_BACKEND_FILE_ASYNC=true STATE_BACKEND_ROCKSDB_INCREMENTAL=true 
> SIMULATE_FAILURE=false ...
> 2020-03-25T06:50:46.1754645Z Job (b8cb04e4b1e730585bc616aa352866d0) is 
> running.
> 2020-03-25T06:50:46.1758132Z Waiting for job 
> (b8cb04e4b1e730585bc616aa352866d0) to have at least 1 completed checkpoints 
> ...
> 2020-03-25T06:50:46.3478276Z Waiting for job to process up to 200 records, 
> current progress: 173 records ...
> 2020-03-25T06:50:49.6332988Z Cancelling job b8cb04e4b1e730585bc616aa352866d0.
> 2020-03-25T06:50:50.4875673Z Cancelled job b8cb04e4b1e730585bc616aa352866d0.
> 2020-03-25T06:50:50.5468230Z ls: cannot access 
> '/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304/externalized-chckpt-e2e-backend-dir/b8cb04e4b1e730585bc616aa352866d0/chk-[1-9]*/_metadata':
>  No such file or directory
> 2020-03-25T06:50:50.5606260Z Restoring job with externalized checkpoint at . 
> ...
> 2020-03-25T06:50:58.4728245Z 
> 2020-03-25T06:50:58.4732663Z 
> 
> 2020-03-25T06:50:58.4735785Z  The program finished with the following 
> exception:
> 2020-03-25T06:50:58.4737759Z 
> 2020-03-25T06:50:58.4742666Z 
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
> JobGraph.
> 2020-03-25T06:50:58.4746274Z  at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> 2020-03-25T06:50:58.4749954Z  at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> 2020-03-25T06:50:58.4752753Z  at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:142)
> 2020-03-25T06:50:58.4755400Z  at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:659)
> 2020-03-25T06:50:58.4757862Z  at 
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
> 2020-03-25T06:50:58.4760282Z  at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:890)
> 2020-03

[jira] [Comment Edited] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file

2020-04-14 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17083018#comment-17083018
 ] 

Biao Liu edited comment on FLINK-16770 at 4/14/20, 9:13 AM:


[~aljoscha], the uploading to transfer.sh failed, I can't confirm the root 
cause. It might be the same reason. [~yunta], do you need some help?


was (Author: sleepy):
[~aljoscha], the uploading to transfer.sh failed, I can't confirm the root 
cause. It might be the same reason.

> Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end 
> test fails with no such file
> ---
>
> Key: FLINK-16770
> URL: https://issues.apache.org/jira/browse/FLINK-16770
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Tests
>Affects Versions: 1.11.0
>Reporter: Zhijiang
>Assignee: Yun Tang
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.11.0
>
> Attachments: e2e-output.log, 
> flink-vsts-standalonesession-0-fv-az53.log
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The log : 
> [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6603&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5]
>  
> There was also the similar problem in 
> https://issues.apache.org/jira/browse/FLINK-16561, but for the case of no 
> parallelism change. And this case is for scaling up. Not quite sure whether 
> the root cause is the same one.
> {code:java}
> 2020-03-25T06:50:31.3894841Z Running 'Resuming Externalized Checkpoint 
> (rocks, incremental, scale up) end-to-end test'
> 2020-03-25T06:50:31.3895308Z 
> ==
> 2020-03-25T06:50:31.3907274Z TEST_DATA_DIR: 
> /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304
> 2020-03-25T06:50:31.5500274Z Flink dist directory: 
> /home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT
> 2020-03-25T06:50:31.6354639Z Starting cluster.
> 2020-03-25T06:50:31.8871932Z Starting standalonesession daemon on host 
> fv-az655.
> 2020-03-25T06:50:33.5021784Z Starting taskexecutor daemon on host fv-az655.
> 2020-03-25T06:50:33.5152274Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:34.5498116Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:35.6031346Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:36.9848425Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:38.0283377Z Dispatcher REST endpoint is up.
> 2020-03-25T06:50:38.0285490Z Running externalized checkpoints test, with 
> ORIGINAL_DOP=2 NEW_DOP=4 and STATE_BACKEND_TYPE=rocks 
> STATE_BACKEND_FILE_ASYNC=true STATE_BACKEND_ROCKSDB_INCREMENTAL=true 
> SIMULATE_FAILURE=false ...
> 2020-03-25T06:50:46.1754645Z Job (b8cb04e4b1e730585bc616aa352866d0) is 
> running.
> 2020-03-25T06:50:46.1758132Z Waiting for job 
> (b8cb04e4b1e730585bc616aa352866d0) to have at least 1 completed checkpoints 
> ...
> 2020-03-25T06:50:46.3478276Z Waiting for job to process up to 200 records, 
> current progress: 173 records ...
> 2020-03-25T06:50:49.6332988Z Cancelling job b8cb04e4b1e730585bc616aa352866d0.
> 2020-03-25T06:50:50.4875673Z Cancelled job b8cb04e4b1e730585bc616aa352866d0.
> 2020-03-25T06:50:50.5468230Z ls: cannot access 
> '/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304/externalized-chckpt-e2e-backend-dir/b8cb04e4b1e730585bc616aa352866d0/chk-[1-9]*/_metadata':
>  No such file or directory
> 2020-03-25T06:50:50.5606260Z Restoring job with externalized checkpoint at . 
> ...
> 2020-03-25T06:50:58.4728245Z 
> 2020-03-25T06:50:58.4732663Z 
> 
> 2020-03-25T06:50:58.4735785Z  The program finished with the following 
> exception:
> 2020-03-25T06:50:58.4737759Z 
> 2020-03-25T06:50:58.4742666Z 
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
> JobGraph.
> 2020-03-25T06:50:58.4746274Z  at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> 2020-03-25T06:50:58.4749954Z  at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> 2020-03-25T06:50:58.4752753Z  at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:142)
> 2020-03-25T06:50:58.4755400Z  at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:659

[jira] [Commented] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file

2020-04-14 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17083018#comment-17083018
 ] 

Biao Liu commented on FLINK-16770:
--

[~aljoscha], the uploading to transfer.sh failed, I can't confirm the root 
cause. It might be the same reason.

> Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end 
> test fails with no such file
> ---
>
> Key: FLINK-16770
> URL: https://issues.apache.org/jira/browse/FLINK-16770
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Tests
>Affects Versions: 1.11.0
>Reporter: Zhijiang
>Assignee: Yun Tang
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.11.0
>
> Attachments: e2e-output.log, 
> flink-vsts-standalonesession-0-fv-az53.log
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The log : 
> [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6603&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5]
>  
> There was also the similar problem in 
> https://issues.apache.org/jira/browse/FLINK-16561, but for the case of no 
> parallelism change. And this case is for scaling up. Not quite sure whether 
> the root cause is the same one.
> {code:java}
> 2020-03-25T06:50:31.3894841Z Running 'Resuming Externalized Checkpoint 
> (rocks, incremental, scale up) end-to-end test'
> 2020-03-25T06:50:31.3895308Z 
> ==
> 2020-03-25T06:50:31.3907274Z TEST_DATA_DIR: 
> /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304
> 2020-03-25T06:50:31.5500274Z Flink dist directory: 
> /home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT
> 2020-03-25T06:50:31.6354639Z Starting cluster.
> 2020-03-25T06:50:31.8871932Z Starting standalonesession daemon on host 
> fv-az655.
> 2020-03-25T06:50:33.5021784Z Starting taskexecutor daemon on host fv-az655.
> 2020-03-25T06:50:33.5152274Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:34.5498116Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:35.6031346Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:36.9848425Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:38.0283377Z Dispatcher REST endpoint is up.
> 2020-03-25T06:50:38.0285490Z Running externalized checkpoints test, with 
> ORIGINAL_DOP=2 NEW_DOP=4 and STATE_BACKEND_TYPE=rocks 
> STATE_BACKEND_FILE_ASYNC=true STATE_BACKEND_ROCKSDB_INCREMENTAL=true 
> SIMULATE_FAILURE=false ...
> 2020-03-25T06:50:46.1754645Z Job (b8cb04e4b1e730585bc616aa352866d0) is 
> running.
> 2020-03-25T06:50:46.1758132Z Waiting for job 
> (b8cb04e4b1e730585bc616aa352866d0) to have at least 1 completed checkpoints 
> ...
> 2020-03-25T06:50:46.3478276Z Waiting for job to process up to 200 records, 
> current progress: 173 records ...
> 2020-03-25T06:50:49.6332988Z Cancelling job b8cb04e4b1e730585bc616aa352866d0.
> 2020-03-25T06:50:50.4875673Z Cancelled job b8cb04e4b1e730585bc616aa352866d0.
> 2020-03-25T06:50:50.5468230Z ls: cannot access 
> '/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304/externalized-chckpt-e2e-backend-dir/b8cb04e4b1e730585bc616aa352866d0/chk-[1-9]*/_metadata':
>  No such file or directory
> 2020-03-25T06:50:50.5606260Z Restoring job with externalized checkpoint at . 
> ...
> 2020-03-25T06:50:58.4728245Z 
> 2020-03-25T06:50:58.4732663Z 
> 
> 2020-03-25T06:50:58.4735785Z  The program finished with the following 
> exception:
> 2020-03-25T06:50:58.4737759Z 
> 2020-03-25T06:50:58.4742666Z 
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
> JobGraph.
> 2020-03-25T06:50:58.4746274Z  at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> 2020-03-25T06:50:58.4749954Z  at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> 2020-03-25T06:50:58.4752753Z  at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:142)
> 2020-03-25T06:50:58.4755400Z  at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:659)
> 2020-03-25T06:50:58.4757862Z  at 
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
> 2020-03-25T06:50:58.4760282Z  at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:890

[jira] [Commented] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file

2020-04-08 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17078067#comment-17078067
 ] 

Biao Liu commented on FLINK-16770:
--

To [~rmetzger], I think FLINK-16423 and this ticket fail in same scenario. To 
be short, the atomicity of finalizing a checkpoint is broken.
I wrote a comment in FLINK-16423.

> Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end 
> test fails with no such file
> ---
>
> Key: FLINK-16770
> URL: https://issues.apache.org/jira/browse/FLINK-16770
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Tests
>Affects Versions: 1.11.0
>Reporter: Zhijiang
>Assignee: Yun Tang
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.11.0
>
> Attachments: e2e-output.log, 
> flink-vsts-standalonesession-0-fv-az53.log
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The log : 
> [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6603&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5]
>  
> There was also the similar problem in 
> https://issues.apache.org/jira/browse/FLINK-16561, but for the case of no 
> parallelism change. And this case is for scaling up. Not quite sure whether 
> the root cause is the same one.
> {code:java}
> 2020-03-25T06:50:31.3894841Z Running 'Resuming Externalized Checkpoint 
> (rocks, incremental, scale up) end-to-end test'
> 2020-03-25T06:50:31.3895308Z 
> ==
> 2020-03-25T06:50:31.3907274Z TEST_DATA_DIR: 
> /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304
> 2020-03-25T06:50:31.5500274Z Flink dist directory: 
> /home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT
> 2020-03-25T06:50:31.6354639Z Starting cluster.
> 2020-03-25T06:50:31.8871932Z Starting standalonesession daemon on host 
> fv-az655.
> 2020-03-25T06:50:33.5021784Z Starting taskexecutor daemon on host fv-az655.
> 2020-03-25T06:50:33.5152274Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:34.5498116Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:35.6031346Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:36.9848425Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:38.0283377Z Dispatcher REST endpoint is up.
> 2020-03-25T06:50:38.0285490Z Running externalized checkpoints test, with 
> ORIGINAL_DOP=2 NEW_DOP=4 and STATE_BACKEND_TYPE=rocks 
> STATE_BACKEND_FILE_ASYNC=true STATE_BACKEND_ROCKSDB_INCREMENTAL=true 
> SIMULATE_FAILURE=false ...
> 2020-03-25T06:50:46.1754645Z Job (b8cb04e4b1e730585bc616aa352866d0) is 
> running.
> 2020-03-25T06:50:46.1758132Z Waiting for job 
> (b8cb04e4b1e730585bc616aa352866d0) to have at least 1 completed checkpoints 
> ...
> 2020-03-25T06:50:46.3478276Z Waiting for job to process up to 200 records, 
> current progress: 173 records ...
> 2020-03-25T06:50:49.6332988Z Cancelling job b8cb04e4b1e730585bc616aa352866d0.
> 2020-03-25T06:50:50.4875673Z Cancelled job b8cb04e4b1e730585bc616aa352866d0.
> 2020-03-25T06:50:50.5468230Z ls: cannot access 
> '/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304/externalized-chckpt-e2e-backend-dir/b8cb04e4b1e730585bc616aa352866d0/chk-[1-9]*/_metadata':
>  No such file or directory
> 2020-03-25T06:50:50.5606260Z Restoring job with externalized checkpoint at . 
> ...
> 2020-03-25T06:50:58.4728245Z 
> 2020-03-25T06:50:58.4732663Z 
> 
> 2020-03-25T06:50:58.4735785Z  The program finished with the following 
> exception:
> 2020-03-25T06:50:58.4737759Z 
> 2020-03-25T06:50:58.4742666Z 
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
> JobGraph.
> 2020-03-25T06:50:58.4746274Z  at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> 2020-03-25T06:50:58.4749954Z  at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> 2020-03-25T06:50:58.4752753Z  at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:142)
> 2020-03-25T06:50:58.4755400Z  at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:659)
> 2020-03-25T06:50:58.4757862Z  at 
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
> 2020-03-25T06:50:58.4760282Z  at 
> org.apache.fl

[jira] [Comment Edited] (FLINK-16423) test_ha_per_job_cluster_datastream.sh gets stuck

2020-04-08 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17078057#comment-17078057
 ] 

Biao Liu edited comment on FLINK-16423 at 4/8/20, 10:56 AM:


Thanks [~rmetzger] for analyzing so deeply. I checked the attached log. I 
believe the scenario is same with FLINK-16770. The problem happens when 
checkpoint 9 is doing finalization. We can see that the 
{{CheckpointCoordinator}} tried to recover from checkpoint 9. So the checkpoint 
9 must be added into {{CompletedCheckpointStore}}. However we can't find the 
log "Completed checkpoint 9 ...". It must failed after being added into 
{{CompletedCheckpointStore}}, like being aborted due to the "artificial 
failure". Regarding to "where is the checkpoint 6, 7, 8", since we only keep 1 
successful checkpoint in {{CompletedCheckpointStore}}, they must be subsumed 
when checkpoint 9 was adding into {{CompletedCheckpointStore}}.

The work-around fixing so far of FLINK-16770 is that keeping 2 successful 
checkpoints in {{CompletedCheckpointStore}} for these cases. So even if 
checkpoint 9 doesn't finish the finalization, there should be at least 
checkpoint 8 existing.

If it gets stuck quite frequently, we could apply the work-around fixing for 
the case. However this bug has to be fixed completely before releasing 1.11.


was (Author: sleepy):
Thanks [~rmetzger] for analyzing so deeply. I checked the attached log. I 
believe the scenario is same with FLINK-16770. The problem happens when 
checkpoint 9 is doing finalization. We can see that the 
{{CheckpointCoordinator}} tried to recover from checkpoint 9. So the checkpoint 
9 must be added into {{CompletedCheckpointStore}}. However we can't find the 
log "Completed checkpoint 9 ...". It must failed after being added into 
{{CompletedCheckpointStore}}, like being aborted due to the "artificial 
failure". Regarding to "where is the checkpoint 6, 7, 8", since we only keep 1 
successful checkpoint in {{CompletedCheckpointStore}}, they must be subsumed 
when checkpoint 9 was adding into {{CompletedCheckpointStore}}. 

The work-around fixing so far of FLINK-16770 is that keeping 2 successful 
checkpoints in {{CompletedCheckpointStore}] for these cases. So even if 
checkpoint 9 doesn't finish the finalization, there should be at least 
checkpoint 8 existing.

If it gets stuck quite frequently, we could apply the work-around fixing for 
the case. However this bug has to be fixed completely before releasing 1.11.

> test_ha_per_job_cluster_datastream.sh gets stuck
> 
>
> Key: FLINK-16423
> URL: https://issues.apache.org/jira/browse/FLINK-16423
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Tests
>Affects Versions: 1.11.0
>Reporter: Robert Metzger
>Assignee: Robert Metzger
>Priority: Blocker
> Attachments: 20200408.1.tgz
>
>
> This was seen in 
> https://dev.azure.com/rmetzger/Flink/_build/results?buildId=5905&view=logs&j=b1623ac9-0979-5b0d-2e5e-1377d695c991&t=e7804547-1789-5225-2bcf-269eeaa37447
>  ... the relevant part of the logs is here:
> {code}
> 2020-03-04T11:27:25.4819486Z 
> ==
> 2020-03-04T11:27:25.4820470Z Running 'Running HA per-job cluster (rocks, 
> non-incremental) end-to-end test'
> 2020-03-04T11:27:25.4820922Z 
> ==
> 2020-03-04T11:27:25.4840177Z TEST_DATA_DIR: 
> /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-25482960156
> 2020-03-04T11:27:25.6712478Z Flink dist directory: 
> /home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT
> 2020-03-04T11:27:25.6830402Z Flink dist directory: 
> /home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT
> 2020-03-04T11:27:26.2988914Z Starting zookeeper daemon on host fv-az655.
> 2020-03-04T11:27:26.3001237Z Running on HA mode: parallelism=4, 
> backend=rocks, asyncSnapshots=true, and incremSnapshots=false.
> 2020-03-04T11:27:27.4206924Z Starting standalonejob daemon on host fv-az655.
> 2020-03-04T11:27:27.4217066Z Start 1 more task managers
> 2020-03-04T11:27:30.8412541Z Starting taskexecutor daemon on host fv-az655.
> 2020-03-04T11:27:38.1779980Z Job () is 
> running.
> 2020-03-04T11:27:38.1781375Z Running JM watchdog @ 89778
> 2020-03-04T11:27:38.1781858Z Running TM watchdog @ 89779
> 2020-03-04T11:27:38.1783272Z Waiting for text Completed checkpoint [1-9]* for 
> job  to appear 2 of times in logs...
> 2020-03-04T13:21:29.9076797Z ##[error]The operation was canceled.
> 2020-03-04T13:21:29.9094090Z ##[section]Finishing: Run e2e tests
> {code}
> The l

[jira] [Commented] (FLINK-16423) test_ha_per_job_cluster_datastream.sh gets stuck

2020-04-08 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17078057#comment-17078057
 ] 

Biao Liu commented on FLINK-16423:
--

Thanks [~rmetzger] for analyzing so deeply. I checked the attached log. I 
believe the scenario is same with FLINK-16770. The problem happens when 
checkpoint 9 is doing finalization. We can see that the 
{{CheckpointCoordinator}} tried to recover from checkpoint 9. So the checkpoint 
9 must be added into {{CompletedCheckpointStore}}. However we can't find the 
log "Completed checkpoint 9 ...". It must failed after being added into 
{{CompletedCheckpointStore}}, like being aborted due to the "artificial 
failure". Regarding to "where is the checkpoint 6, 7, 8", since we only keep 1 
successful checkpoint in {{CompletedCheckpointStore}}, they must be subsumed 
when checkpoint 9 was adding into {{CompletedCheckpointStore}}. 

The work-around fixing so far of FLINK-16770 is that keeping 2 successful 
checkpoints in {{CompletedCheckpointStore}] for these cases. So even if 
checkpoint 9 doesn't finish the finalization, there should be at least 
checkpoint 8 existing.

If it gets stuck quite frequently, we could apply the work-around fixing for 
the case. However this bug has to be fixed completely before releasing 1.11.

> test_ha_per_job_cluster_datastream.sh gets stuck
> 
>
> Key: FLINK-16423
> URL: https://issues.apache.org/jira/browse/FLINK-16423
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Tests
>Affects Versions: 1.11.0
>Reporter: Robert Metzger
>Assignee: Robert Metzger
>Priority: Blocker
> Attachments: 20200408.1.tgz
>
>
> This was seen in 
> https://dev.azure.com/rmetzger/Flink/_build/results?buildId=5905&view=logs&j=b1623ac9-0979-5b0d-2e5e-1377d695c991&t=e7804547-1789-5225-2bcf-269eeaa37447
>  ... the relevant part of the logs is here:
> {code}
> 2020-03-04T11:27:25.4819486Z 
> ==
> 2020-03-04T11:27:25.4820470Z Running 'Running HA per-job cluster (rocks, 
> non-incremental) end-to-end test'
> 2020-03-04T11:27:25.4820922Z 
> ==
> 2020-03-04T11:27:25.4840177Z TEST_DATA_DIR: 
> /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-25482960156
> 2020-03-04T11:27:25.6712478Z Flink dist directory: 
> /home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT
> 2020-03-04T11:27:25.6830402Z Flink dist directory: 
> /home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT
> 2020-03-04T11:27:26.2988914Z Starting zookeeper daemon on host fv-az655.
> 2020-03-04T11:27:26.3001237Z Running on HA mode: parallelism=4, 
> backend=rocks, asyncSnapshots=true, and incremSnapshots=false.
> 2020-03-04T11:27:27.4206924Z Starting standalonejob daemon on host fv-az655.
> 2020-03-04T11:27:27.4217066Z Start 1 more task managers
> 2020-03-04T11:27:30.8412541Z Starting taskexecutor daemon on host fv-az655.
> 2020-03-04T11:27:38.1779980Z Job () is 
> running.
> 2020-03-04T11:27:38.1781375Z Running JM watchdog @ 89778
> 2020-03-04T11:27:38.1781858Z Running TM watchdog @ 89779
> 2020-03-04T11:27:38.1783272Z Waiting for text Completed checkpoint [1-9]* for 
> job  to appear 2 of times in logs...
> 2020-03-04T13:21:29.9076797Z ##[error]The operation was canceled.
> 2020-03-04T13:21:29.9094090Z ##[section]Finishing: Run e2e tests
> {code}
> The last three lines indicate that the test is waiting forever for a 
> checkpoint to appear.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file

2020-04-06 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17076161#comment-17076161
 ] 

Biao Liu commented on FLINK-16770:
--

Thanks [~rmetzger] for manually verifying and merging the PR. 

> Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end 
> test fails with no such file
> ---
>
> Key: FLINK-16770
> URL: https://issues.apache.org/jira/browse/FLINK-16770
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Tests
>Affects Versions: 1.11.0
>Reporter: Zhijiang
>Assignee: Yun Tang
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.11.0
>
> Attachments: e2e-output.log, 
> flink-vsts-standalonesession-0-fv-az53.log
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The log : 
> [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6603&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5]
>  
> There was also the similar problem in 
> https://issues.apache.org/jira/browse/FLINK-16561, but for the case of no 
> parallelism change. And this case is for scaling up. Not quite sure whether 
> the root cause is the same one.
> {code:java}
> 2020-03-25T06:50:31.3894841Z Running 'Resuming Externalized Checkpoint 
> (rocks, incremental, scale up) end-to-end test'
> 2020-03-25T06:50:31.3895308Z 
> ==
> 2020-03-25T06:50:31.3907274Z TEST_DATA_DIR: 
> /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304
> 2020-03-25T06:50:31.5500274Z Flink dist directory: 
> /home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT
> 2020-03-25T06:50:31.6354639Z Starting cluster.
> 2020-03-25T06:50:31.8871932Z Starting standalonesession daemon on host 
> fv-az655.
> 2020-03-25T06:50:33.5021784Z Starting taskexecutor daemon on host fv-az655.
> 2020-03-25T06:50:33.5152274Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:34.5498116Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:35.6031346Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:36.9848425Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:38.0283377Z Dispatcher REST endpoint is up.
> 2020-03-25T06:50:38.0285490Z Running externalized checkpoints test, with 
> ORIGINAL_DOP=2 NEW_DOP=4 and STATE_BACKEND_TYPE=rocks 
> STATE_BACKEND_FILE_ASYNC=true STATE_BACKEND_ROCKSDB_INCREMENTAL=true 
> SIMULATE_FAILURE=false ...
> 2020-03-25T06:50:46.1754645Z Job (b8cb04e4b1e730585bc616aa352866d0) is 
> running.
> 2020-03-25T06:50:46.1758132Z Waiting for job 
> (b8cb04e4b1e730585bc616aa352866d0) to have at least 1 completed checkpoints 
> ...
> 2020-03-25T06:50:46.3478276Z Waiting for job to process up to 200 records, 
> current progress: 173 records ...
> 2020-03-25T06:50:49.6332988Z Cancelling job b8cb04e4b1e730585bc616aa352866d0.
> 2020-03-25T06:50:50.4875673Z Cancelled job b8cb04e4b1e730585bc616aa352866d0.
> 2020-03-25T06:50:50.5468230Z ls: cannot access 
> '/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304/externalized-chckpt-e2e-backend-dir/b8cb04e4b1e730585bc616aa352866d0/chk-[1-9]*/_metadata':
>  No such file or directory
> 2020-03-25T06:50:50.5606260Z Restoring job with externalized checkpoint at . 
> ...
> 2020-03-25T06:50:58.4728245Z 
> 2020-03-25T06:50:58.4732663Z 
> 
> 2020-03-25T06:50:58.4735785Z  The program finished with the following 
> exception:
> 2020-03-25T06:50:58.4737759Z 
> 2020-03-25T06:50:58.4742666Z 
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
> JobGraph.
> 2020-03-25T06:50:58.4746274Z  at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> 2020-03-25T06:50:58.4749954Z  at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> 2020-03-25T06:50:58.4752753Z  at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:142)
> 2020-03-25T06:50:58.4755400Z  at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:659)
> 2020-03-25T06:50:58.4757862Z  at 
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
> 2020-03-25T06:50:58.4760282Z  at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:890)
> 2020-03-25T06:50:58.4763591Z  at 
> org.apach

[jira] [Commented] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file

2020-04-03 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17074438#comment-17074438
 ] 

Biao Liu commented on FLINK-16770:
--

After a short discussion with [~yunta] offline, we reached agreement of the 
possible solution. [~yunta] will continue working on it.

Besides that, we think it's better to quickly fix the failed case first. So 
other guys could avoid suffering from this unstable failure. I have created a 
PR to try to resolve the failed case in a work-around way. [~yunta] could you 
take a look is there anything missing?

> Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end 
> test fails with no such file
> ---
>
> Key: FLINK-16770
> URL: https://issues.apache.org/jira/browse/FLINK-16770
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Tests
>Affects Versions: 1.11.0
>Reporter: Zhijiang
>Assignee: Yun Tang
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.11.0
>
> Attachments: e2e-output.log, 
> flink-vsts-standalonesession-0-fv-az53.log
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The log : 
> [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6603&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5]
>  
> There was also the similar problem in 
> https://issues.apache.org/jira/browse/FLINK-16561, but for the case of no 
> parallelism change. And this case is for scaling up. Not quite sure whether 
> the root cause is the same one.
> {code:java}
> 2020-03-25T06:50:31.3894841Z Running 'Resuming Externalized Checkpoint 
> (rocks, incremental, scale up) end-to-end test'
> 2020-03-25T06:50:31.3895308Z 
> ==
> 2020-03-25T06:50:31.3907274Z TEST_DATA_DIR: 
> /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304
> 2020-03-25T06:50:31.5500274Z Flink dist directory: 
> /home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT
> 2020-03-25T06:50:31.6354639Z Starting cluster.
> 2020-03-25T06:50:31.8871932Z Starting standalonesession daemon on host 
> fv-az655.
> 2020-03-25T06:50:33.5021784Z Starting taskexecutor daemon on host fv-az655.
> 2020-03-25T06:50:33.5152274Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:34.5498116Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:35.6031346Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:36.9848425Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:38.0283377Z Dispatcher REST endpoint is up.
> 2020-03-25T06:50:38.0285490Z Running externalized checkpoints test, with 
> ORIGINAL_DOP=2 NEW_DOP=4 and STATE_BACKEND_TYPE=rocks 
> STATE_BACKEND_FILE_ASYNC=true STATE_BACKEND_ROCKSDB_INCREMENTAL=true 
> SIMULATE_FAILURE=false ...
> 2020-03-25T06:50:46.1754645Z Job (b8cb04e4b1e730585bc616aa352866d0) is 
> running.
> 2020-03-25T06:50:46.1758132Z Waiting for job 
> (b8cb04e4b1e730585bc616aa352866d0) to have at least 1 completed checkpoints 
> ...
> 2020-03-25T06:50:46.3478276Z Waiting for job to process up to 200 records, 
> current progress: 173 records ...
> 2020-03-25T06:50:49.6332988Z Cancelling job b8cb04e4b1e730585bc616aa352866d0.
> 2020-03-25T06:50:50.4875673Z Cancelled job b8cb04e4b1e730585bc616aa352866d0.
> 2020-03-25T06:50:50.5468230Z ls: cannot access 
> '/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304/externalized-chckpt-e2e-backend-dir/b8cb04e4b1e730585bc616aa352866d0/chk-[1-9]*/_metadata':
>  No such file or directory
> 2020-03-25T06:50:50.5606260Z Restoring job with externalized checkpoint at . 
> ...
> 2020-03-25T06:50:58.4728245Z 
> 2020-03-25T06:50:58.4732663Z 
> 
> 2020-03-25T06:50:58.4735785Z  The program finished with the following 
> exception:
> 2020-03-25T06:50:58.4737759Z 
> 2020-03-25T06:50:58.4742666Z 
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
> JobGraph.
> 2020-03-25T06:50:58.4746274Z  at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> 2020-03-25T06:50:58.4749954Z  at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> 2020-03-25T06:50:58.4752753Z  at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:142)
> 2020-03-25T06:50:58.4755400Z  at 

[jira] [Comment Edited] (FLINK-16931) Large _metadata file lead to JobManager not responding when restart

2020-04-02 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17073927#comment-17073927
 ] 

Biao Liu edited comment on FLINK-16931 at 4/2/20, 5:31 PM:
---

[~trohrmann], my pleasure :)

I could share some context here. We have discussed this scenario before 
refactoring the whole threading model of {{CheckpointCoordinator}}, see 
FLINK-13497 and FLINK-13698. Although this scenario is not the cause of 
FLINK-13497, we think there is risk of heartbeat timeout. At that time, we 
decided to treat it as a follow-up issue. However we haven't file any ticket 
for it yet.

After FLINK-13698, most parts of the non-IO operations of 
{{CheckpointCoordinator}} are executed in main thread executor, except the 
initialization part which causes this problem. One of the final targets is 
putting all IO operations of {{CheckpointCoordinator}} into IO thread executor, 
others are executed in main thread executor. To achieve this, some synchronous 
operations must be refactored into asynchronous ways. I think that's what we 
need to do here.


was (Author: sleepy):
[~trohrmann], my pleasure :)

I could share some context here. We have discussed this scenario refactoring 
the whole threading model of {{CheckpointCoordinator}}, see FLINK-13497 and 
FLINK-13698. Although this scenario is not the cause of FLINK-13497, we think 
there is risk of heartbeat timeout. At that time, we decided to treat it as a 
follow-up issue. However we haven't file any ticket for it yet. 

After FLINK-13698, most parts of the non-IO operations of 
{{CheckpointCoordinator}} are executed in main thread executor, except the 
initialization part which causes this problem. One of the final targets is 
putting all IO operations of {{CheckpointCoordinator}} into IO thread executor, 
others are executed in main thread executor. To achieve this, some synchronous 
operations must be refactored into asynchronous ways. I think that's what we 
need to do here. 

> Large _metadata file lead to JobManager not responding when restart
> ---
>
> Key: FLINK-16931
> URL: https://issues.apache.org/jira/browse/FLINK-16931
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / Coordination
>Affects Versions: 1.9.2, 1.10.0, 1.11.0
>Reporter: Lu Niu
>Priority: Critical
> Fix For: 1.11.0
>
>
> When _metadata file is big, JobManager could never recover from checkpoint. 
> It fall into a loop that fetch checkpoint -> JM timeout -> restart. Here is 
> related log: 
> {code:java}
>  2020-04-01 17:08:25,689 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - 
> Recovering checkpoints from ZooKeeper.
>  2020-04-01 17:08:25,698 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Found 
> 3 checkpoints in ZooKeeper.
>  2020-04-01 17:08:25,698 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - 
> Trying to fetch 3 checkpoints from storage.
>  2020-04-01 17:08:25,698 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - 
> Trying to retrieve checkpoint 50.
>  2020-04-01 17:08:48,589 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - 
> Trying to retrieve checkpoint 51.
>  2020-04-01 17:09:12,775 INFO org.apache.flink.yarn.YarnResourceManager - The 
> heartbeat of JobManager with id 02500708baf0bb976891c391afd3d7d5 timed out.
> {code}
> Digging into the code, looks like ExecutionGraph::restart runs in JobMaster 
> main thread and finally calls 
> ZooKeeperCompletedCheckpointStore::retrieveCompletedCheckpoint which download 
> file form DFS. The main thread is basically blocked for a while because of 
> this. One possible solution is to making the downloading part async. More 
> things might need to consider as the original change tries to make it 
> single-threaded. [https://github.com/apache/flink/pull/7568]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-16931) Large _metadata file lead to JobManager not responding when restart

2020-04-02 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17073927#comment-17073927
 ] 

Biao Liu edited comment on FLINK-16931 at 4/2/20, 5:31 PM:
---

[~trohrmann], my pleasure :)

I could share some context here. We have discussed this scenario before 
refactoring the whole threading model of {{CheckpointCoordinator}}, see 
FLINK-13497 and FLINK-13698. Although this scenario is not the cause of 
FLINK-13497, we think there is a risk of heartbeat timeout. At that time, we 
decided to treat it as a follow-up issue. However we haven't file any ticket 
for it yet.

After FLINK-13698, most parts of the non-IO operations of 
{{CheckpointCoordinator}} are executed in main thread executor, except the 
initialization part which causes this problem. One of the final targets is 
putting all IO operations of {{CheckpointCoordinator}} into IO thread executor, 
others are executed in main thread executor. To achieve this, some synchronous 
operations must be refactored into asynchronous ways. I think that's what we 
need to do here.


was (Author: sleepy):
[~trohrmann], my pleasure :)

I could share some context here. We have discussed this scenario before 
refactoring the whole threading model of {{CheckpointCoordinator}}, see 
FLINK-13497 and FLINK-13698. Although this scenario is not the cause of 
FLINK-13497, we think there is risk of heartbeat timeout. At that time, we 
decided to treat it as a follow-up issue. However we haven't file any ticket 
for it yet.

After FLINK-13698, most parts of the non-IO operations of 
{{CheckpointCoordinator}} are executed in main thread executor, except the 
initialization part which causes this problem. One of the final targets is 
putting all IO operations of {{CheckpointCoordinator}} into IO thread executor, 
others are executed in main thread executor. To achieve this, some synchronous 
operations must be refactored into asynchronous ways. I think that's what we 
need to do here.

> Large _metadata file lead to JobManager not responding when restart
> ---
>
> Key: FLINK-16931
> URL: https://issues.apache.org/jira/browse/FLINK-16931
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / Coordination
>Affects Versions: 1.9.2, 1.10.0, 1.11.0
>Reporter: Lu Niu
>Priority: Critical
> Fix For: 1.11.0
>
>
> When _metadata file is big, JobManager could never recover from checkpoint. 
> It fall into a loop that fetch checkpoint -> JM timeout -> restart. Here is 
> related log: 
> {code:java}
>  2020-04-01 17:08:25,689 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - 
> Recovering checkpoints from ZooKeeper.
>  2020-04-01 17:08:25,698 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Found 
> 3 checkpoints in ZooKeeper.
>  2020-04-01 17:08:25,698 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - 
> Trying to fetch 3 checkpoints from storage.
>  2020-04-01 17:08:25,698 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - 
> Trying to retrieve checkpoint 50.
>  2020-04-01 17:08:48,589 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - 
> Trying to retrieve checkpoint 51.
>  2020-04-01 17:09:12,775 INFO org.apache.flink.yarn.YarnResourceManager - The 
> heartbeat of JobManager with id 02500708baf0bb976891c391afd3d7d5 timed out.
> {code}
> Digging into the code, looks like ExecutionGraph::restart runs in JobMaster 
> main thread and finally calls 
> ZooKeeperCompletedCheckpointStore::retrieveCompletedCheckpoint which download 
> file form DFS. The main thread is basically blocked for a while because of 
> this. One possible solution is to making the downloading part async. More 
> things might need to consider as the original change tries to make it 
> single-threaded. [https://github.com/apache/flink/pull/7568]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16931) Large _metadata file lead to JobManager not responding when restart

2020-04-02 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17073927#comment-17073927
 ] 

Biao Liu commented on FLINK-16931:
--

[~trohrmann], my pleasure :)

I could share some context here. We have discussed this scenario refactoring 
the whole threading model of {{CheckpointCoordinator}}, see FLINK-13497 and 
FLINK-13698. Although this scenario is not the cause of FLINK-13497, we think 
there is risk of heartbeat timeout. At that time, we decided to treat it as a 
follow-up issue. However we haven't file any ticket for it yet. 

After FLINK-13698, most parts of the non-IO operations of 
{{CheckpointCoordinator}} are executed in main thread executor, except the 
initialization part which causes this problem. One of the final targets is 
putting all IO operations of {{CheckpointCoordinator}} into IO thread executor, 
others are executed in main thread executor. To achieve this, some synchronous 
operations must be refactored into asynchronous ways. I think that's what we 
need to do here. 

> Large _metadata file lead to JobManager not responding when restart
> ---
>
> Key: FLINK-16931
> URL: https://issues.apache.org/jira/browse/FLINK-16931
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / Coordination
>Affects Versions: 1.9.2, 1.10.0, 1.11.0
>Reporter: Lu Niu
>Priority: Critical
> Fix For: 1.11.0
>
>
> When _metadata file is big, JobManager could never recover from checkpoint. 
> It fall into a loop that fetch checkpoint -> JM timeout -> restart. Here is 
> related log: 
> {code:java}
>  2020-04-01 17:08:25,689 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - 
> Recovering checkpoints from ZooKeeper.
>  2020-04-01 17:08:25,698 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Found 
> 3 checkpoints in ZooKeeper.
>  2020-04-01 17:08:25,698 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - 
> Trying to fetch 3 checkpoints from storage.
>  2020-04-01 17:08:25,698 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - 
> Trying to retrieve checkpoint 50.
>  2020-04-01 17:08:48,589 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - 
> Trying to retrieve checkpoint 51.
>  2020-04-01 17:09:12,775 INFO org.apache.flink.yarn.YarnResourceManager - The 
> heartbeat of JobManager with id 02500708baf0bb976891c391afd3d7d5 timed out.
> {code}
> Digging into the code, looks like ExecutionGraph::restart runs in JobMaster 
> main thread and finally calls 
> ZooKeeperCompletedCheckpointStore::retrieveCompletedCheckpoint which download 
> file form DFS. The main thread is basically blocked for a while because of 
> this. One possible solution is to making the downloading part async. More 
> things might need to consider as the original change tries to make it 
> single-threaded. [https://github.com/apache/flink/pull/7568]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16945) Execute CheckpointFailureManager.FailJobCallback directly in main thread executor

2020-04-02 Thread Biao Liu (Jira)
Biao Liu created FLINK-16945:


 Summary: Execute CheckpointFailureManager.FailJobCallback directly 
in main thread executor
 Key: FLINK-16945
 URL: https://issues.apache.org/jira/browse/FLINK-16945
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Affects Versions: 1.10.0
Reporter: Biao Liu
 Fix For: 1.11.0


Since we have put all non-IO operations of {{CheckpointCoordinator}} into main 
thread executor, the {{CheckpointFailureManager.FailJobCallback}} could be 
executed directly now. In this way execution graph would fail immediately when 
{{CheckpointFailureManager}} invokes the callback. We could avoid the 
inconsistent scenario of FLINK-13497.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16561) Resuming Externalized Checkpoint (rocks, incremental, no parallelism change) end-to-end test fails on Azure

2020-04-02 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17073441#comment-17073441
 ] 

Biao Liu commented on FLINK-16561:
--

[~rmetzger], I have closed this ticket, let's discuss under FLINK-16770.

> Resuming Externalized Checkpoint (rocks, incremental, no parallelism change) 
> end-to-end test fails on Azure
> ---
>
> Key: FLINK-16561
> URL: https://issues.apache.org/jira/browse/FLINK-16561
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Tests
>Affects Versions: 1.11.0
>Reporter: Biao Liu
>Assignee: Yun Tang
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.11.0
>
>
> {quote}Caused by: java.io.IOException: Cannot access file system for 
> checkpoint/savepoint path 'file://.'.
>   at 
> org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpointPointer(AbstractFsCheckpointStorage.java:233)
>   at 
> org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpoint(AbstractFsCheckpointStorage.java:110)
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1332)
>   at 
> org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:314)
>   at 
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:247)
>   at 
> org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:223)
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:118)
>   at 
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:281)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:269)
>   at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
>   at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
>   at 
> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:146)
>   ... 10 more
> Caused by: java.io.IOException: Found local file path with authority '.' in 
> path 'file://.'. Hint: Did you forget a slash? (correct path would be 
> 'file:///.')
>   at 
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:441)
>   at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)
>   at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
>   at 
> org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpointPointer(AbstractFsCheckpointStorage.java:230)
>   ... 22 more
> {quote}
> The original log is here, 
> https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6073&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=2b7514ee-e706-5046-657b-3430666e7bd9
> There are some similar tickets about this case, but the stack here looks 
> different. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-16561) Resuming Externalized Checkpoint (rocks, incremental, no parallelism change) end-to-end test fails on Azure

2020-04-02 Thread Biao Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu closed FLINK-16561.

Resolution: Duplicate

> Resuming Externalized Checkpoint (rocks, incremental, no parallelism change) 
> end-to-end test fails on Azure
> ---
>
> Key: FLINK-16561
> URL: https://issues.apache.org/jira/browse/FLINK-16561
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Tests
>Affects Versions: 1.11.0
>Reporter: Biao Liu
>Assignee: Yun Tang
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.11.0
>
>
> {quote}Caused by: java.io.IOException: Cannot access file system for 
> checkpoint/savepoint path 'file://.'.
>   at 
> org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpointPointer(AbstractFsCheckpointStorage.java:233)
>   at 
> org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpoint(AbstractFsCheckpointStorage.java:110)
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1332)
>   at 
> org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:314)
>   at 
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:247)
>   at 
> org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:223)
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:118)
>   at 
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:281)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:269)
>   at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
>   at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
>   at 
> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:146)
>   ... 10 more
> Caused by: java.io.IOException: Found local file path with authority '.' in 
> path 'file://.'. Hint: Did you forget a slash? (correct path would be 
> 'file:///.')
>   at 
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:441)
>   at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)
>   at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
>   at 
> org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpointPointer(AbstractFsCheckpointStorage.java:230)
>   ... 22 more
> {quote}
> The original log is here, 
> https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6073&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=2b7514ee-e706-5046-657b-3430666e7bd9
> There are some similar tickets about this case, but the stack here looks 
> different. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file

2020-04-01 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17073388#comment-17073388
 ] 

Biao Liu edited comment on FLINK-16770 at 4/2/20, 5:52 AM:
---

Hi [~yunta], thanks for the response. If I understand correctly, there is an 
inconsistent state of {{CompletedCheckpointStore}} while stopping a checkpoint 
which is doing asynchronous finalization.

There are two strategies here,
 1. The checkpoint which is doing finalization could be aborted when 
{{CheckpointCoordinator}} is being shut down or periodic scheduler is being 
stopped. This is the choice of current implementation. However we didn't handle 
the {{CompletedCheckpointStore}} well. For example it might be better that 
reverting the state of {{CompletedCheckpointStore}} when the 
{{PendingCheckpoint}} finds the discarding after asynchronous finalization. But 
I think it's not easy to do so. Because there might be a subsuming operation 
during {{CompletedCheckpointStore#addCheckpoint}}.
 2. The checkpoint which is doing finalization could NOT be aborted when 
{{CheckpointCoordinator}} is being shut down or period scheduler is being 
stopped. I personally prefer this solution, because it could simply the 
concurrent conflict scenario and it's much easier to implement. I think 
introducing an atomic boolean might not be enough. It's better to rethink the 
relationship between {{PendingCheckpoint#abort}} and 
{{PendingCheckpoint#finalizeCheckpoint}}. And we also need to rewrite a part of 
error handling of the finalization.

BTW, [~yunta] could you share the unit test case which could reproduce the 
scenario locally? I want to verify my assumption and solution. The original e2e 
test case is not stable.


was (Author: sleepy):
Hi [~yunta], thanks for the response. If I understand correctly, there is an 
inconsistent state of {{CompletedCheckpointStore}} while stopping a checkpoint 
which is doing asynchronous finalization.

There are two strategy here,
1. The checkpoint which is doing finalization could be aborted when 
{{CheckpointCoordinator}} is being shut down or periodic scheduler is being 
stopped. This is the choice of current implementation. However we didn't handle 
the {{CompletedCheckpointStore}} well. For example it might be better that 
reverting the state of {{CompletedCheckpointStore}} when the 
{{PendingCheckpoint}} finds the discarding after asynchronous finalization. But 
I think it's not easy to do so. Because there might be a subsuming operation 
during {{CompletedCheckpointStore#addCheckpoint}}.
2. The checkpoint which is doing finalization could NOT be aborted when 
{{CheckpointCoordinator}} is being shut down or period scheduler is being 
stopped. I personally prefer this solution, because it could simply the 
concurrent conflict scenario and it's much easier to implement. I think 
introducing an atomic boolean might not be enough. It's better to rethink the 
relationship between {{PendingCheckpoint#abort}} and 
{{PendingCheckpoint#finalizeCheckpoint}}. And we also need to rewrite a part of 
error handling of the finalization. 

BTW, [~yunta] could you share the unit test case which could reproduce the 
scenario locally? I want to verify my suggestion and solution. The original e2e 
test case is not stable.

> Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end 
> test fails with no such file
> ---
>
> Key: FLINK-16770
> URL: https://issues.apache.org/jira/browse/FLINK-16770
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Tests
>Affects Versions: 1.11.0
>Reporter: Zhijiang
>Assignee: Yun Tang
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.11.0
>
> Attachments: e2e-output.log, 
> flink-vsts-standalonesession-0-fv-az53.log
>
>
> The log : 
> [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6603&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5]
>  
> There was also the similar problem in 
> https://issues.apache.org/jira/browse/FLINK-16561, but for the case of no 
> parallelism change. And this case is for scaling up. Not quite sure whether 
> the root cause is the same one.
> {code:java}
> 2020-03-25T06:50:31.3894841Z Running 'Resuming Externalized Checkpoint 
> (rocks, incremental, scale up) end-to-end test'
> 2020-03-25T06:50:31.3895308Z 
> ==
> 2020-03-25T06:50:31.3907274Z TEST_DATA_DIR: 
> /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304
> 2020-03-25T06:50:31.5500274Z Flink dist directory: 
> /home/vsts/work/1/s/flink-dist/target/fl

[jira] [Commented] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file

2020-04-01 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17073388#comment-17073388
 ] 

Biao Liu commented on FLINK-16770:
--

Hi [~yunta], thanks for the response. If I understand correctly, there is an 
inconsistent state of {{CompletedCheckpointStore}} while stopping a checkpoint 
which is doing asynchronous finalization.

There are two strategy here,
1. The checkpoint which is doing finalization could be aborted when 
{{CheckpointCoordinator}} is being shut down or periodic scheduler is being 
stopped. This is the choice of current implementation. However we didn't handle 
the {{CompletedCheckpointStore}} well. For example it might be better that 
reverting the state of {{CompletedCheckpointStore}} when the 
{{PendingCheckpoint}} finds the discarding after asynchronous finalization. But 
I think it's not easy to do so. Because there might be a subsuming operation 
during {{CompletedCheckpointStore#addCheckpoint}}.
2. The checkpoint which is doing finalization could NOT be aborted when 
{{CheckpointCoordinator}} is being shut down or period scheduler is being 
stopped. I personally prefer this solution, because it could simply the 
concurrent conflict scenario and it's much easier to implement. I think 
introducing an atomic boolean might not be enough. It's better to rethink the 
relationship between {{PendingCheckpoint#abort}} and 
{{PendingCheckpoint#finalizeCheckpoint}}. And we also need to rewrite a part of 
error handling of the finalization. 

BTW, [~yunta] could you share the unit test case which could reproduce the 
scenario locally? I want to verify my suggestion and solution. The original e2e 
test case is not stable.

> Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end 
> test fails with no such file
> ---
>
> Key: FLINK-16770
> URL: https://issues.apache.org/jira/browse/FLINK-16770
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Tests
>Affects Versions: 1.11.0
>Reporter: Zhijiang
>Assignee: Yun Tang
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.11.0
>
> Attachments: e2e-output.log, 
> flink-vsts-standalonesession-0-fv-az53.log
>
>
> The log : 
> [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6603&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5]
>  
> There was also the similar problem in 
> https://issues.apache.org/jira/browse/FLINK-16561, but for the case of no 
> parallelism change. And this case is for scaling up. Not quite sure whether 
> the root cause is the same one.
> {code:java}
> 2020-03-25T06:50:31.3894841Z Running 'Resuming Externalized Checkpoint 
> (rocks, incremental, scale up) end-to-end test'
> 2020-03-25T06:50:31.3895308Z 
> ==
> 2020-03-25T06:50:31.3907274Z TEST_DATA_DIR: 
> /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304
> 2020-03-25T06:50:31.5500274Z Flink dist directory: 
> /home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT
> 2020-03-25T06:50:31.6354639Z Starting cluster.
> 2020-03-25T06:50:31.8871932Z Starting standalonesession daemon on host 
> fv-az655.
> 2020-03-25T06:50:33.5021784Z Starting taskexecutor daemon on host fv-az655.
> 2020-03-25T06:50:33.5152274Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:34.5498116Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:35.6031346Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:36.9848425Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:38.0283377Z Dispatcher REST endpoint is up.
> 2020-03-25T06:50:38.0285490Z Running externalized checkpoints test, with 
> ORIGINAL_DOP=2 NEW_DOP=4 and STATE_BACKEND_TYPE=rocks 
> STATE_BACKEND_FILE_ASYNC=true STATE_BACKEND_ROCKSDB_INCREMENTAL=true 
> SIMULATE_FAILURE=false ...
> 2020-03-25T06:50:46.1754645Z Job (b8cb04e4b1e730585bc616aa352866d0) is 
> running.
> 2020-03-25T06:50:46.1758132Z Waiting for job 
> (b8cb04e4b1e730585bc616aa352866d0) to have at least 1 completed checkpoints 
> ...
> 2020-03-25T06:50:46.3478276Z Waiting for job to process up to 200 records, 
> current progress: 173 records ...
> 2020-03-25T06:50:49.6332988Z Cancelling job b8cb04e4b1e730585bc616aa352866d0.
> 2020-03-25T06:50:50.4875673Z Cancelled job b8cb04e4b1e730585bc616aa352866d0.
> 2020-03-25T06:50:50.5468230Z ls: cannot access 
> '/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304/externalized-chckpt-e2e-backend-dir/b8cb04e4b1e730585bc616aa352866d0/chk-[1-9]*/_meta

[jira] [Commented] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file

2020-04-01 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17072553#comment-17072553
 ] 

Biao Liu commented on FLINK-16770:
--

Hi [~yunta], thanks for the analysis. I have a question that if chk-8 is 
dropped when cancelling the job, the chk-7 would not be subsumed since the 
finalization of chk-8 would not finish after adding to checkpoint store 
asynchronously. It would check the discarding state before doing subsuming. 

Although I haven't check the testing case carefully, I guess this might be 
relevant with FLINK-14971 which make the threading model here asynchronous. 
There is a small possibility that a checkpoint is discarded but it could be 
added into checkpoint store successfully. Because currently the cancellation 
and the manipulation on checkpoint store are in different threads. There is no 
a big lock for everything as before. Do you think it could cause this failure?

> Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end 
> test fails with no such file
> ---
>
> Key: FLINK-16770
> URL: https://issues.apache.org/jira/browse/FLINK-16770
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Tests
>Affects Versions: 1.11.0
>Reporter: Zhijiang
>Assignee: Yun Tang
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.11.0
>
> Attachments: e2e-output.log, 
> flink-vsts-standalonesession-0-fv-az53.log
>
>
> The log : 
> [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6603&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5]
>  
> There was also the similar problem in 
> https://issues.apache.org/jira/browse/FLINK-16561, but for the case of no 
> parallelism change. And this case is for scaling up. Not quite sure whether 
> the root cause is the same one.
> {code:java}
> 2020-03-25T06:50:31.3894841Z Running 'Resuming Externalized Checkpoint 
> (rocks, incremental, scale up) end-to-end test'
> 2020-03-25T06:50:31.3895308Z 
> ==
> 2020-03-25T06:50:31.3907274Z TEST_DATA_DIR: 
> /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304
> 2020-03-25T06:50:31.5500274Z Flink dist directory: 
> /home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT
> 2020-03-25T06:50:31.6354639Z Starting cluster.
> 2020-03-25T06:50:31.8871932Z Starting standalonesession daemon on host 
> fv-az655.
> 2020-03-25T06:50:33.5021784Z Starting taskexecutor daemon on host fv-az655.
> 2020-03-25T06:50:33.5152274Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:34.5498116Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:35.6031346Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:36.9848425Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:38.0283377Z Dispatcher REST endpoint is up.
> 2020-03-25T06:50:38.0285490Z Running externalized checkpoints test, with 
> ORIGINAL_DOP=2 NEW_DOP=4 and STATE_BACKEND_TYPE=rocks 
> STATE_BACKEND_FILE_ASYNC=true STATE_BACKEND_ROCKSDB_INCREMENTAL=true 
> SIMULATE_FAILURE=false ...
> 2020-03-25T06:50:46.1754645Z Job (b8cb04e4b1e730585bc616aa352866d0) is 
> running.
> 2020-03-25T06:50:46.1758132Z Waiting for job 
> (b8cb04e4b1e730585bc616aa352866d0) to have at least 1 completed checkpoints 
> ...
> 2020-03-25T06:50:46.3478276Z Waiting for job to process up to 200 records, 
> current progress: 173 records ...
> 2020-03-25T06:50:49.6332988Z Cancelling job b8cb04e4b1e730585bc616aa352866d0.
> 2020-03-25T06:50:50.4875673Z Cancelled job b8cb04e4b1e730585bc616aa352866d0.
> 2020-03-25T06:50:50.5468230Z ls: cannot access 
> '/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304/externalized-chckpt-e2e-backend-dir/b8cb04e4b1e730585bc616aa352866d0/chk-[1-9]*/_metadata':
>  No such file or directory
> 2020-03-25T06:50:50.5606260Z Restoring job with externalized checkpoint at . 
> ...
> 2020-03-25T06:50:58.4728245Z 
> 2020-03-25T06:50:58.4732663Z 
> 
> 2020-03-25T06:50:58.4735785Z  The program finished with the following 
> exception:
> 2020-03-25T06:50:58.4737759Z 
> 2020-03-25T06:50:58.4742666Z 
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
> JobGraph.
> 2020-03-25T06:50:58.4746274Z  at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> 2020-03-2

[jira] [Commented] (FLINK-9741) Register JVM metrics for each JM separately

2020-03-30 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-9741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17070980#comment-17070980
 ] 

Biao Liu commented on FLINK-9741:
-

Oops, sorry, I forgot this issue. I gave it a lower priority.

Anyway, we could discuss a bit here.
First, I think we'd better keep the metrics under {{JobManager}} due to 
compatibility. [~trohrmann], [~chesnay], do you guys think it's acceptable that 
forking the JVM metrics from {{JobManager}} to {{RM}}, {{Dispatcher}} or even 
{{JobMaster}} ? We could do some optimization that they could share the same 
metric instance. However it might still annoy user there are so many duplicated 
metrics under different metric group, especially reported by {{MetricReporter}}.

> Register JVM metrics for each JM separately
> ---
>
> Key: FLINK-9741
> URL: https://issues.apache.org/jira/browse/FLINK-9741
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination, Runtime / Metrics
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Biao Liu
>Priority: Major
>
> Currently, the {{Dispatcher}} contains a {{JobManagerMetricGroup}} on which 
> the JVM metrics are registered. the JobManagers only receive a 
> {{JobManagerJobMetricGroup}} and don't register the JVM metrics.
> As the dispatcher and jobmanagers currently run in the same jvm, neither 
> exposing their IDs to the metric system, this doesn't cause problem _right 
> now_ as we can't differentiate between them anyway, but it will bite us down 
> the line if either of the above assumptions is broken.
> For example, with the proposed exposure of JM/Dispatcher IDs in FLINK-9543 we 
> would not expose JVM metrics tied to a JM, but only the Dispatcher.
> I propose to register all JVM metrics for each jobmanager separately to 
> future proof the whole thing.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-13848) Support “scheduleAtFixedRate/scheduleAtFixedDelay” in RpcEndpoint#MainThreadExecutor

2020-03-23 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17064761#comment-17064761
 ] 

Biao Liu commented on FLINK-13848:
--

Just an update, we have a work-around solution instead which keeps the timer 
for periodic trigger of {{CheckpointCoordinator}} in FLINK-14971. So this 
ticket is unnecessary for FLINK-13698. Maybe someday we would come back to this 
issue if this feature is needed.

> Support “scheduleAtFixedRate/scheduleAtFixedDelay” in 
> RpcEndpoint#MainThreadExecutor
> 
>
> Key: FLINK-13848
> URL: https://issues.apache.org/jira/browse/FLINK-13848
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently the methods “scheduleAtFixedRate/scheduleAtFixedDelay" of 
> {{RpcEndpoint#MainThreadExecutor}} are not implemented. Because there was no 
> requirement on them before.
> Now we are planning to implement these methods to support periodic checkpoint 
> triggering.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-13848) Support “scheduleAtFixedRate/scheduleAtFixedDelay” in RpcEndpoint#MainThreadExecutor

2020-03-23 Thread Biao Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-13848:
-
Fix Version/s: (was: 1.11.0)

> Support “scheduleAtFixedRate/scheduleAtFixedDelay” in 
> RpcEndpoint#MainThreadExecutor
> 
>
> Key: FLINK-13848
> URL: https://issues.apache.org/jira/browse/FLINK-13848
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently the methods “scheduleAtFixedRate/scheduleAtFixedDelay" of 
> {{RpcEndpoint#MainThreadExecutor}} are not implemented. Because there was no 
> requirement on them before.
> Now we are planning to implement these methods to support periodic checkpoint 
> triggering.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-13848) Support “scheduleAtFixedRate/scheduleAtFixedDelay” in RpcEndpoint#MainThreadExecutor

2020-03-23 Thread Biao Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-13848:
-
Parent: (was: FLINK-13698)
Issue Type: Improvement  (was: Sub-task)

> Support “scheduleAtFixedRate/scheduleAtFixedDelay” in 
> RpcEndpoint#MainThreadExecutor
> 
>
> Key: FLINK-13848
> URL: https://issues.apache.org/jira/browse/FLINK-13848
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently the methods “scheduleAtFixedRate/scheduleAtFixedDelay" of 
> {{RpcEndpoint#MainThreadExecutor}} are not implemented. Because there was no 
> requirement on them before.
> Now we are planning to implement these methods to support periodic checkpoint 
> triggering.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16720) Maven gets stuck downloading artifacts on Azure

2020-03-23 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17064715#comment-17064715
 ] 

Biao Liu commented on FLINK-16720:
--

Another instance, 
https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6514&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=27d1d645-cbce-54e2-51c4-d8b45fe24607

> Maven gets stuck downloading artifacts on Azure
> ---
>
> Key: FLINK-16720
> URL: https://issues.apache.org/jira/browse/FLINK-16720
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / Azure Pipelines
>Affects Versions: 1.11.0
>Reporter: Robert Metzger
>Priority: Major
>
> Logs: 
> https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6509&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=27d1d645-cbce-54e2-51c4-d8b45fe24607
> {code}
> 2020-03-23T08:43:28.4128014Z [INFO] 
> 
> 2020-03-23T08:43:28.4128557Z [INFO] Building flink-avro-confluent-registry 
> 1.11-SNAPSHOT
> 2020-03-23T08:43:28.4129129Z [INFO] 
> 
> 2020-03-23T08:48:47.6591333Z 
> ==
> 2020-03-23T08:48:47.6594540Z Maven produced no output for 300 seconds.
> 2020-03-23T08:48:47.6595164Z 
> ==
> 2020-03-23T08:48:47.6605370Z 
> ==
> 2020-03-23T08:48:47.6605803Z The following Java processes are running (JPS)
> 2020-03-23T08:48:47.6606173Z 
> ==
> 2020-03-23T08:48:47.7710037Z 920 Jps
> 2020-03-23T08:48:47.7778561Z 238 Launcher
> 2020-03-23T08:48:47.9270289Z 
> ==
> 2020-03-23T08:48:47.9270832Z Printing stack trace of Java process 967
> 2020-03-23T08:48:47.9271199Z 
> ==
> 2020-03-23T08:48:48.0165945Z 967: No such process
> 2020-03-23T08:48:48.0218260Z 
> ==
> 2020-03-23T08:48:48.0218736Z Printing stack trace of Java process 238
> 2020-03-23T08:48:48.0219075Z 
> ==
> 2020-03-23T08:48:48.3404066Z 2020-03-23 08:48:48
> 2020-03-23T08:48:48.3404828Z Full thread dump OpenJDK 64-Bit Server VM 
> (25.242-b08 mixed mode):
> 2020-03-23T08:48:48.3405064Z 
> 2020-03-23T08:48:48.3405445Z "Attach Listener" #370 daemon prio=9 os_prio=0 
> tid=0x7fe130001000 nid=0x452 waiting on condition [0x]
> 2020-03-23T08:48:48.3405868Zjava.lang.Thread.State: RUNNABLE
> 2020-03-23T08:48:48.3411202Z 
> 2020-03-23T08:48:48.3413171Z "resolver-5" #105 daemon prio=5 os_prio=0 
> tid=0x7fe1ec2ad800 nid=0x177 waiting on condition [0x7fe1872d9000]
> 2020-03-23T08:48:48.3414175Zjava.lang.Thread.State: WAITING (parking)
> 2020-03-23T08:48:48.3414560Z  at sun.misc.Unsafe.park(Native Method)
> 2020-03-23T08:48:48.3415451Z  - parking to wait for  <0x0003d5a9f828> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> 2020-03-23T08:48:48.3416180Z  at 
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> 2020-03-23T08:48:48.3416825Z  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> 2020-03-23T08:48:48.3417602Z  at 
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
> 2020-03-23T08:48:48.3418250Z  at 
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
> 2020-03-23T08:48:48.3418930Z  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
> 2020-03-23T08:48:48.3419900Z  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 2020-03-23T08:48:48.3420395Z  at java.lang.Thread.run(Thread.java:748)
> 2020-03-23T08:48:48.3420648Z 
> 2020-03-23T08:48:48.3421424Z "resolver-4" #104 daemon prio=5 os_prio=0 
> tid=0x7fe1ec2ad000 nid=0x176 waiting on condition [0x7fe1863dd000]
> 2020-03-23T08:48:48.3421914Zjava.lang.Thread.State: WAITING (parking)
> 2020-03-23T08:48:48.3422233Z  at sun.misc.Unsafe.park(Native Method)
> 2020-03-23T08:48:48.3422919Z  - parking to wait for  <0x0003d5a9f828> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> 2020-03-23T08:48:48.3423447Z  at 
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> 2020-03-23T08:48:48.3424141Z  at 
> java.util.concurrent

[jira] [Created] (FLINK-16561) Resuming Externalized Checkpoint (rocks, incremental, no parallelism change) end-to-end test fails on Azure

2020-03-11 Thread Biao Liu (Jira)
Biao Liu created FLINK-16561:


 Summary: Resuming Externalized Checkpoint (rocks, incremental, no 
parallelism change) end-to-end test fails on Azure
 Key: FLINK-16561
 URL: https://issues.apache.org/jira/browse/FLINK-16561
 Project: Flink
  Issue Type: Test
  Components: Tests
Affects Versions: 1.11.0
Reporter: Biao Liu


{quote}Caused by: java.io.IOException: Cannot access file system for 
checkpoint/savepoint path 'file://.'.
at 
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpointPointer(AbstractFsCheckpointStorage.java:233)
at 
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpoint(AbstractFsCheckpointStorage.java:110)
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1332)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:314)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:247)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:223)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:118)
at 
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103)
at 
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:281)
at 
org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:269)
at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
at 
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:146)
... 10 more
Caused by: java.io.IOException: Found local file path with authority '.' in 
path 'file://.'. Hint: Did you forget a slash? (correct path would be 
'file:///.')
at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:441)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
at 
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpointPointer(AbstractFsCheckpointStorage.java:230)
... 22 more
{quote}

The original log is here, 
https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6073&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=2b7514ee-e706-5046-657b-3430666e7bd9

There are some similar tickets about this case, but the stack here looks 
different. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14971) Make all the non-IO operations in CheckpointCoordinator single-threaded

2020-03-08 Thread Biao Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-14971:
-
Description: 
Currently the ACK and declined message handling are executed in IO thread. This 
is the only rest part that non-IO operations are executed in IO thread. It 
blocks introducing main thread executor for {{CheckpointCoordinator}}. It would 
be resolved in this task.

After resolving the ACK and declined message issue, the main thread executor 
would be introduced into {{CheckpointCoordinator}} to instead of timer thread. 
However the timer thread would be kept (maybe for a while temporarily) to 
schedule periodic triggering, since FLINK-13848 is not accepted yet.

  was:
Currently the ACK and declined message handling are executed in IO thread. This 
is the only rest part that non-IO operations are executed in IO thread. It 
blocks introducing main thread executor for {{CheckpointCoordinator}}. It would 
be resolved in this task.

After resolving the ACK and declined message issue, all operations could be 
executed in main thread. Also we don't need coordinator-wide lock anymore then.





> Make all the non-IO operations in CheckpointCoordinator single-threaded
> ---
>
> Key: FLINK-14971
> URL: https://issues.apache.org/jira/browse/FLINK-14971
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently the ACK and declined message handling are executed in IO thread. 
> This is the only rest part that non-IO operations are executed in IO thread. 
> It blocks introducing main thread executor for {{CheckpointCoordinator}}. It 
> would be resolved in this task.
> After resolving the ACK and declined message issue, the main thread executor 
> would be introduced into {{CheckpointCoordinator}} to instead of timer 
> thread. However the timer thread would be kept (maybe for a while 
> temporarily) to schedule periodic triggering, since FLINK-13848 is not 
> accepted yet.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14971) Make all the non-IO operations in CheckpointCoordinator single-threaded

2020-03-08 Thread Biao Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-14971:
-
Description: 
Currently the ACK and declined message handling are executed in IO thread. This 
is the only rest part that non-IO operations are executed in IO thread. It 
blocks introducing main thread executor for {{CheckpointCoordinator}}. It would 
be resolved in this task.

After resolving the ACK and declined message issue, all operations could be 
executed in main thread. Also we don't need coordinator-wide lock anymore then.




  was:
Currently the ACK and declined message handling are executed in IO thread. It 
should be moved into main thread eventually.
After this step, all operations could be executed in main thread. Also we don't 
need coordinator-wide lock anymore then.


> Make all the non-IO operations in CheckpointCoordinator single-threaded
> ---
>
> Key: FLINK-14971
> URL: https://issues.apache.org/jira/browse/FLINK-14971
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently the ACK and declined message handling are executed in IO thread. 
> This is the only rest part that non-IO operations are executed in IO thread. 
> It blocks introducing main thread executor for {{CheckpointCoordinator}}. It 
> would be resolved in this task.
> After resolving the ACK and declined message issue, all operations could be 
> executed in main thread. Also we don't need coordinator-wide lock anymore 
> then.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14971) Make all the non-IO operations in CheckpointCoordinator single-threaded

2020-03-08 Thread Biao Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-14971:
-
Summary: Make all the non-IO operations in CheckpointCoordinator 
single-threaded  (was: Move ACK and declined message handling in the same 
thread with triggering)

> Make all the non-IO operations in CheckpointCoordinator single-threaded
> ---
>
> Key: FLINK-14971
> URL: https://issues.apache.org/jira/browse/FLINK-14971
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently the ACK and declined message handling are executed in IO thread. It 
> should be moved into main thread eventually.
> After this step, all operations could be executed in main thread. Also we 
> don't need coordinator-wide lock anymore then.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15611) KafkaITCase.testOneToOneSources fails on Travis

2020-01-17 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17017789#comment-17017789
 ] 

Biao Liu commented on FLINK-15611:
--

[~jqin], could you take a look at this case?

> KafkaITCase.testOneToOneSources fails on Travis
> ---
>
> Key: FLINK-15611
> URL: https://issues.apache.org/jira/browse/FLINK-15611
> Project: Flink
>  Issue Type: Bug
>Reporter: Yangze Guo
>Priority: Critical
> Fix For: 1.10.0
>
>
> {{The test KafkaITCase.testOneToOneSources failed on Travis.}}
> {code:java}
> 03:15:02,019 INFO  
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl  - 
> Deleting topic scale-down-before-first-checkpoint
> 03:15:02,037 INFO  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase  - 
> 
> Test 
> testScaleDownBeforeFirstCheckpoint(org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase)
>  successfully run.
> 
> 03:15:02,038 INFO  org.apache.flink.streaming.connectors.kafka.KafkaTestBase  
>- -
> 03:15:02,038 INFO  org.apache.flink.streaming.connectors.kafka.KafkaTestBase  
>- Shut down KafkaTestBase 
> 03:15:02,038 INFO  org.apache.flink.streaming.connectors.kafka.KafkaTestBase  
>- -
> 03:15:25,728 INFO  org.apache.flink.streaming.connectors.kafka.KafkaTestBase  
>- -
> 03:15:25,728 INFO  org.apache.flink.streaming.connectors.kafka.KafkaTestBase  
>- KafkaTestBase finished
> 03:15:25,728 INFO  org.apache.flink.streaming.connectors.kafka.KafkaTestBase  
>- -
> 03:15:25.731 [INFO] Tests run: 12, Failures: 0, Errors: 0, Skipped: 0, Time 
> elapsed: 245.845 s - in 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase
> 03:15:26.099 [INFO] 
> 03:15:26.099 [INFO] Results:
> 03:15:26.099 [INFO] 
> 03:15:26.099 [ERROR] Failures: 
> 03:15:26.099 [ERROR]   
> KafkaITCase.testOneToOneSources:97->KafkaConsumerTestBase.runOneToOneExactlyOnceTest:862
>  Test failed: Job execution failed.
> {code}
> https://api.travis-ci.com/v3/job/276124537/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14742) Unstable tests TaskExecutorTest#testSubmitTaskBeforeAcceptSlot

2020-01-16 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14742?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17017779#comment-17017779
 ] 

Biao Liu commented on FLINK-14742:
--

Such a subtle case if it couldn't be reproduced :)
I have checked the test case, but didn't find the clue. Nice work [~kkl0u]!

> Unstable tests TaskExecutorTest#testSubmitTaskBeforeAcceptSlot
> --
>
> Key: FLINK-14742
> URL: https://issues.apache.org/jira/browse/FLINK-14742
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: Zili Chen
>Assignee: Kostas Kloudas
>Priority: Critical
> Fix For: 1.10.0
>
>
> deadlock.
> {code}
> "main" #1 prio=5 os_prio=0 tid=0x7f1f8800b800 nid=0x356 waiting on 
> condition [0x7f1f8e65c000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0x86e9e9c0> (a 
> java.util.concurrent.CompletableFuture$Signaller)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
>   at 
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>   at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutorTest.testSubmitTaskBeforeAcceptSlot(TaskExecutorTest.java:1108)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
> {code}
> full log https://api.travis-ci.org/v3/job/611275566/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15611) KafkaITCase.testOneToOneSources fails on Travis

2020-01-16 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17017767#comment-17017767
 ] 

Biao Liu commented on FLINK-15611:
--

{quote}
...
Caused by: java.lang.Exception: Received a duplicate: 4924
at 
org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink.invoke(ValidatingExactlyOnceSink.java:57)
at 
org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink.invoke(ValidatingExactlyOnceSink.java:36)
at 
org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:170)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:702)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:527)
at java.lang.Thread.run(Thread.java:748)
{quote}

It looks like a serious issue. The exactly once semantics here seems to be 
broken.

> KafkaITCase.testOneToOneSources fails on Travis
> ---
>
> Key: FLINK-15611
> URL: https://issues.apache.org/jira/browse/FLINK-15611
> Project: Flink
>  Issue Type: Bug
>Reporter: Yangze Guo
>Priority: Critical
> Fix For: 1.10.0
>
>
> {{The test KafkaITCase.testOneToOneSources failed on Travis.}}
> {code:java}
> 03:15:02,019 INFO  
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl  - 
> Deleting topic scale-down-before-first-checkpoint
> 03:15:02,037 INFO  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase  - 
> 
> Test 
> testScaleDownBeforeFirstCheckpoint(org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase)
>  successfully run.
> 
> 03:15:02,038 INFO  org.apache.flink.streaming.connectors.kafka.KafkaTestBase  
>- -
> 03:15:02,038 INFO  org.apache.flink.streaming.connectors.kafka.KafkaTestBase  
>- Shut down KafkaTestBase 
> 03:15:02,038 INFO  org.apache.flink.streaming.connectors.kafka.KafkaTestBase  
>- -
> 03:15:25,728 INFO  org.apache.flink.streaming.connectors.kafka.KafkaTestBase  
>- -
> 03:15:25,728 INFO  org.apache.flink.streaming.connectors.kafka.KafkaTestBase  
>- KafkaTestBase finished
> 03:15:25,728 INFO  org.apache.flink.streaming.connectors.kafka.KafkaTestBase  
>- -
> 03:15:25.731 [INFO] Tests run: 12, Failures: 0, Errors: 0, Skipped: 0, Time 
> elapsed: 245.845 s - in 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase
> 03:15:26.099 [INFO] 
> 03:15:26.099 [INFO] Results:
> 03:15:26.099 [INFO] 
> 03:15:26.099 [ERROR] Failures: 
> 03:15:26.099 [ERROR]   
> KafkaITCase.testOneToOneSources:97->KafkaConsumerTestBase.runOneToOneExactlyOnceTest:862
>  Test failed: Job execution failed.
> {code}
> https://api.travis-ci.com/v3/job/276124537/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15541) FlinkKinesisConsumerTest.testSourceSynchronization is unstable on travis.

2020-01-12 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17014083#comment-17014083
 ] 

Biao Liu commented on FLINK-15541:
--

This is an obvious bug of unstable test case. Please check the PR, it's easy to 
understand.

> FlinkKinesisConsumerTest.testSourceSynchronization is unstable on travis.
> -
>
> Key: FLINK-15541
> URL: https://issues.apache.org/jira/browse/FLINK-15541
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.10.0
>Reporter: Xintong Song
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.10.0
>
>
> [https://api.travis-ci.org/v3/job/634712405/log.txt]
> {code:java}
> 13:16:19.144 [ERROR] Tests run: 11, Failures: 1, Errors: 0, Skipped: 0, Time 
> elapsed: 4.338 s <<< FAILURE! - in 
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumerTest
> 13:16:19.144 [ERROR] 
> testSourceSynchronization(org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumerTest)
>   Time elapsed: 1.001 s  <<< FAILURE!
> java.lang.AssertionError: expected null, but was: expected>
>   at 
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumerTest.testSourceSynchronization(FlinkKinesisConsumerTest.java:1018)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15503) FileUploadHandlerTest.testMixedMultipart and FileUploadHandlerTest. testUploadCleanupOnUnknownAttribute failed on Azure

2020-01-08 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17010674#comment-17010674
 ] 

Biao Liu commented on FLINK-15503:
--

This case costed over 100 seconds. It's incredibly slow. Maybe other tests in 
the same case are just lucky enough to pass?


 BTW, we can't get full log (through transfer.sh) under Azure environment 
currently? I found a relevant discussion [1] in mailing list.

 

[1] 
[http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Migrate-build-infrastructure-from-Travis-CI-to-Azure-Pipelines-tt35538.html#a35641]

> FileUploadHandlerTest.testMixedMultipart and FileUploadHandlerTest. 
> testUploadCleanupOnUnknownAttribute failed on Azure
> ---
>
> Key: FLINK-15503
> URL: https://issues.apache.org/jira/browse/FLINK-15503
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / REST, Tests
>Affects Versions: 1.10.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.10.0
>
>
> The tests {{FileUploadHandlerTest.testMixedMultipart}} and 
> {{FileUploadHandlerTest. testUploadCleanupOnUnknownAttribute}} failed on 
> Azure with 
> {code}
> 2020-01-07T09:32:06.9840445Z [ERROR] 
> testUploadCleanupOnUnknownAttribute(org.apache.flink.runtime.rest.FileUploadHandlerTest)
>   Time elapsed: 12.457 s  <<< ERROR!
> 2020-01-07T09:32:06.9850865Z java.net.SocketTimeoutException: timeout
> 2020-01-07T09:32:06.9851650Z  at 
> org.apache.flink.runtime.rest.FileUploadHandlerTest.testUploadCleanupOnUnknownAttribute(FileUploadHandlerTest.java:234)
> 2020-01-07T09:32:06.9852910Z Caused by: java.net.SocketException: Socket 
> closed
> 2020-01-07T09:32:06.9853465Z  at 
> org.apache.flink.runtime.rest.FileUploadHandlerTest.testUploadCleanupOnUnknownAttribute(FileUploadHandlerTest.java:234)
> 2020-01-07T09:32:06.9853855Z 
> 2020-01-07T09:32:06.9854362Z [ERROR] 
> testMixedMultipart(org.apache.flink.runtime.rest.FileUploadHandlerTest)  Time 
> elapsed: 10.091 s  <<< ERROR!
> 2020-01-07T09:32:06.9855125Z java.net.SocketTimeoutException: Read timed out
> 2020-01-07T09:32:06.9855652Z  at 
> org.apache.flink.runtime.rest.FileUploadHandlerTest.testMixedMultipart(FileUploadHandlerTest.java:154)
> 2020-01-07T09:32:06.9856034Z 
> {code}
> https://dev.azure.com/rmetzger/Flink/_build/results?buildId=4159&view=results



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-13905) Separate checkpoint triggering into stages

2019-12-10 Thread Biao Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-13905:
-
Priority: Major  (was: Blocker)

> Separate checkpoint triggering into stages
> --
>
> Key: FLINK-13905
> URL: https://issues.apache.org/jira/browse/FLINK-13905
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently {{CheckpointCoordinator#triggerCheckpoint}} includes some heavy IO 
> operations. We plan to separate the triggering into different stages. The IO 
> operations are executed in IO threads, while other on-memory operations are 
> not.
> This is a preparation for making all on-memory operations of 
> {{CheckpointCoordinator}} single threaded (in main thread).
> Note that we could not put on-memory operations of triggering into main 
> thread directly now. Because there are still some operations on a heavy lock 
> (coordinator-wide).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-13848) Support “scheduleAtFixedRate/scheduleAtFixedDelay” in RpcEndpoint#MainThreadExecutor

2019-12-10 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16992662#comment-16992662
 ] 

Biao Liu commented on FLINK-13848:
--

[~trohrmann], this is an alternative implementation. [~pnowojski] and me 
haven't reached agreement yet. It's probably needed I guess. We will get back 
to this discussion after FLINK-13905.

BTW, since FLINK-15132 is a false alarm. This ticket and its parent ticket are 
not blocker anymore.

> Support “scheduleAtFixedRate/scheduleAtFixedDelay” in 
> RpcEndpoint#MainThreadExecutor
> 
>
> Key: FLINK-13848
> URL: https://issues.apache.org/jira/browse/FLINK-13848
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently the methods “scheduleAtFixedRate/scheduleAtFixedDelay" of 
> {{RpcEndpoint#MainThreadExecutor}} are not implemented. Because there was no 
> requirement on them before.
> Now we are planning to implement these methods to support periodic checkpoint 
> triggering.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15032) Remove the eager serialization from `RemoteRpcInvocation`

2019-12-09 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16991225#comment-16991225
 ] 

Biao Liu commented on FLINK-15032:
--

[~trohrmann], thanks for explanation! I think you have raised a critical 
question.

Besides the message size checking, the serialization checking might be more 
sticky. We might avoid the message size checking somehow, see the discussion 
under FLINK-4399. But I don't think we could do the same thing on serialization 
checking. It seems that this synchronous pre-checking can not be avoided, 
because some rpc invocations are "fire and forget". It's hard to tell invoker 
that there is something wrong with the rpc message without a synchronous 
pre-checking.

Here are some rough ideas of mine. I think maybe we could do this optimization 
on the invocation without user-defined fields. I believe we could guarantee 
that the message is small and serializable in some scenarios.

> Remove the eager serialization from `RemoteRpcInvocation` 
> --
>
> Key: FLINK-15032
> URL: https://issues.apache.org/jira/browse/FLINK-15032
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Guowei Ma
>Priority: Major
>
>     Currently, the constructor of `RemoteRpcInvocation` serializes the 
> `parameterTypes` and `arg` of an RPC call. This could lead to a problem:
> Consider a job that has 1k parallelism and has a 1m union list state. When 
> deploying the 1k tasks, the eager serialization would use 1G memory 
> instantly(Some time the serialization amplifies the memory usage). However, 
> the serialized object is only used when the Akka sends the message.  So we 
> could reduce the memory pressure if we only serialize the object when the 
> message would be sent by the Akka.
> Akka would serialize the message at last and all the XXXGateway related class 
> could be visible by the RPC level. Because of that, I think the eager 
> serialization in the constructor of `RemoteRpcInvocation` could be avoided. I 
> also do a simple test and find this could reduce the time cost of the RPC 
> call. The 1k number of  RPC calls with 1m `String` message:  The current 
> version costs around 2700ms; the Nonserialization version cost about 37ms.
>  
> In summary, this Jira proposes to remove the eager serialization at the 
> constructor of `RemoteRpcInvocation`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15043) Possible thread leak in task manager

2019-12-04 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16988367#comment-16988367
 ] 

Biao Liu commented on FLINK-15043:
--

[~Jordan.Hatcher], that's OK. Glad to hear it has been solved :)

> Possible thread leak in task manager
> 
>
> Key: FLINK-15043
> URL: https://issues.apache.org/jira/browse/FLINK-15043
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.8.2
>Reporter: Jordan Hatcher
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15043) Possible thread leak in task manager

2019-12-03 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16987497#comment-16987497
 ] 

Biao Liu commented on FLINK-15043:
--

Hi [~Jordan.Hatcher], could you explain the reason of why you closed the 
ticket. Have you found the root cause? 

> Possible thread leak in task manager
> 
>
> Key: FLINK-15043
> URL: https://issues.apache.org/jira/browse/FLINK-15043
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.8.2
>Reporter: Jordan Hatcher
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-15032) Remove the eagerly serialization from `RemoteRpcInvocation`

2019-12-03 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16986721#comment-16986721
 ] 

Biao Liu edited comment on FLINK-15032 at 12/3/19 8:52 AM:
---

Hi [~maguowei], thanks for the impressive proposal.

Though I have not fully understood the scenario. Here are some questions of 
mine.
{quote}Consider a job that has 1k parallelism and has a 1m union list state. 
When deploying the 1k tasks, the eager serialization would use 1G memory 
instantly(Some time the serialization amplifies the memory usage). However, the 
serialized object is only used when the Akka sends the message. So we could 
reduce the memory pressure if we only serialize the object when the message 
would be sent by the Akka.
{quote}
Do you mean we could postpone the serialization till 
{{RemoteRpcInvocation#writeObject}}?
{quote}Furthermore, Akka would serialize the message at last and all the 
XXXGateway related class could be visible by the RPC level. Because of that, I 
think the serialization in the constructor of `RemoteRpcInvocation` could be 
avoided.
{quote}
Could you explain it a bit more? Do you mean there is no visible issue of 
postponing the serialization? Or we don't need serialization anymore?
{quote}In summary, this Jira proposes to remove the serialization at the 
constructor of `RemoteRpcInvocation`.
{quote}
Do we still need serialization in {{wirteObject}} after removing serialization 
in constructor?


was (Author: sleepy):
Hi [~maguowei], thanks for the impressive proposal.

I have not fully understood the scenario. Here are some questions of mine.

{quote}Consider a job that has 1k parallelism and has a 1m union list state. 
When deploying the 1k tasks, the eager serialization would use 1G memory 
instantly(Some time the serialization amplifies the memory usage). However, the 
serialized object is only used when the Akka sends the message.  So we could 
reduce the memory pressure if we only serialize the object when the message 
would be sent by the Akka.{quote}
Do you mean we could postpone the serialization till 
{{RemoteRpcInvocation#writeObject}}?

{quote}Furthermore, Akka would serialize the message at last and all the 
XXXGateway related class could be visible by the RPC level. Because of that, I 
think the serialization in the constructor of `RemoteRpcInvocation` could be 
avoided.{quote}
Could you explain it a bit more? Do you mean there is no visible issue of 
postponing the serialization? Or we don't need serialization anymore?

{quote}In summary, this Jira proposes to remove the serialization at the 
constructor of `RemoteRpcInvocation`.{quote}
Do we still need serialization in {{wirteObject}} after removing serialization 
in constructor?


> Remove the eagerly serialization from `RemoteRpcInvocation` 
> 
>
> Key: FLINK-15032
> URL: https://issues.apache.org/jira/browse/FLINK-15032
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Guowei Ma
>Priority: Major
>
>     Currently, the constructor of `RemoteRpcInvocation` serializes the 
> `parameterTypes` and `arg` of an RPC call. This could lead to two problems:
>  # Consider a job that has 1k parallelism and has a 1m union list state. When 
> deploying the 1k tasks, the eager serialization would use 1G memory 
> instantly(Some time the serialization amplifies the memory usage). However, 
> the serialized object is only used when the Akka sends the message.  So we 
> could reduce the memory pressure if we only serialize the object when the 
> message would be sent by the Akka.
>  # Furthermore, Akka would serialize the message at last and all the 
> XXXGateway related class could be visible by the RPC level. Because of that, 
> I think the serialization in the constructor of `RemoteRpcInvocation` could 
> be avoided. I also do a simple test and find this could reduce the time cost 
> of the RPC call. The 1k number of  RPC calls with 1m `String` message:  The 
> current version costs around 2700ms; the Nonserialization version cost about 
> 37ms.
>  
> In summary, this Jira proposes to remove the serialization at the constructor 
> of `RemoteRpcInvocation`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15032) Remove the eagerly serialization from `RemoteRpcInvocation`

2019-12-03 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16986721#comment-16986721
 ] 

Biao Liu commented on FLINK-15032:
--

Hi [~maguowei], thanks for the impressive proposal.

I have not fully understood the scenario. Here are some questions of mine.

{quote}Consider a job that has 1k parallelism and has a 1m union list state. 
When deploying the 1k tasks, the eager serialization would use 1G memory 
instantly(Some time the serialization amplifies the memory usage). However, the 
serialized object is only used when the Akka sends the message.  So we could 
reduce the memory pressure if we only serialize the object when the message 
would be sent by the Akka.{quote}
Do you mean we could postpone the serialization till 
{{RemoteRpcInvocation#writeObject}}?

{quote}Furthermore, Akka would serialize the message at last and all the 
XXXGateway related class could be visible by the RPC level. Because of that, I 
think the serialization in the constructor of `RemoteRpcInvocation` could be 
avoided.{quote}
Could you explain it a bit more? Do you mean there is no visible issue of 
postponing the serialization? Or we don't need serialization anymore?

{quote}In summary, this Jira proposes to remove the serialization at the 
constructor of `RemoteRpcInvocation`.{quote}
Do we still need serialization in {{wirteObject}} after removing serialization 
in constructor?


> Remove the eagerly serialization from `RemoteRpcInvocation` 
> 
>
> Key: FLINK-15032
> URL: https://issues.apache.org/jira/browse/FLINK-15032
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Guowei Ma
>Priority: Major
>
>     Currently, the constructor of `RemoteRpcInvocation` serializes the 
> `parameterTypes` and `arg` of an RPC call. This could lead to two problems:
>  # Consider a job that has 1k parallelism and has a 1m union list state. When 
> deploying the 1k tasks, the eager serialization would use 1G memory 
> instantly(Some time the serialization amplifies the memory usage). However, 
> the serialized object is only used when the Akka sends the message.  So we 
> could reduce the memory pressure if we only serialize the object when the 
> message would be sent by the Akka.
>  # Furthermore, Akka would serialize the message at last and all the 
> XXXGateway related class could be visible by the RPC level. Because of that, 
> I think the serialization in the constructor of `RemoteRpcInvocation` could 
> be avoided. I also do a simple test and find this could reduce the time cost 
> of the RPC call. The 1k number of  RPC calls with 1m `String` message:  The 
> current version costs around 2700ms; the Nonserialization version cost about 
> 37ms.
>  
> In summary, this Jira proposes to remove the serialization at the constructor 
> of `RemoteRpcInvocation`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14950) Support getKey in WindowOperator.Context

2019-11-29 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16984922#comment-16984922
 ] 

Biao Liu commented on FLINK-14950:
--

[~wind_ljy],

{quote}By the way, Since the key access is not allowed we have to call 
context.toString() and analyze the pattern to get the binding key.
{quote}
Sorry to here that :(

I'm not saying it's a bad idea to make the key accessible in window context. 
It's just, you know, I didn't see a strong reason to do so. Your case seems to 
be quite complicated. I still have not fully understood your scenario. Even if 
you could get the key from context, how do you remove state from all windows 
associate with the key? The trigger is for window, not for key.

{quote}Maybe we should wait for more responses from others.
{quote}
Yeh, that would be great if someone else could give us a better idea.

> Support getKey in WindowOperator.Context
> 
>
> Key: FLINK-14950
> URL: https://issues.apache.org/jira/browse/FLINK-14950
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Affects Versions: 1.9.1
>Reporter: Jiayi Liao
>Priority: Major
>
> In our scenario, user needs to access the key of {{WindowOperator.Context}} 
> to determine how to deal with the window.
> I think it's reasonable to support {{getKey()}} method in 
> {{WindowOperator.Context}}. 
> cc [~aljoscha]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14950) Support getKey in WindowOperator.Context

2019-11-29 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16984853#comment-16984853
 ] 

Biao Liu commented on FLINK-14950:
--

Hi [~wind_ljy], thanks for explanation.

I'm not sure I have fully understood your requirement. I guess there are a lot 
of other details in your scenario. Anyway regardless others, it seems that you 
want a periodic checking for each key (chatting room) in window operator. I'm 
wondering it's general enough. For example, what if someone else wants a 
periodic checking based on other fields (not the key)? It's a bit customized 
from my perspective.

> Support getKey in WindowOperator.Context
> 
>
> Key: FLINK-14950
> URL: https://issues.apache.org/jira/browse/FLINK-14950
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Affects Versions: 1.9.1
>Reporter: Jiayi Liao
>Priority: Major
>
> In our scenario, user needs to access the key of {{WindowOperator.Context}} 
> to determine how to deal with the window.
> I think it's reasonable to support {{getKey()}} method in 
> {{WindowOperator.Context}}. 
> cc [~aljoscha]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14971) Move ACK and declined message handling in the same thread with triggering

2019-11-27 Thread Biao Liu (Jira)
Biao Liu created FLINK-14971:


 Summary: Move ACK and declined message handling in the same thread 
with triggering
 Key: FLINK-14971
 URL: https://issues.apache.org/jira/browse/FLINK-14971
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Checkpointing
Reporter: Biao Liu
 Fix For: 1.10.0


Currently the ACK and declined message handling are executed in IO thread. It 
should be moved into main thread eventually.
After this step, all operations could be executed in main thread. Also we don't 
need coordinator-wide lock anymore then.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14950) Support getKey in WindowOperator.Context

2019-11-26 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16983131#comment-16983131
 ] 

Biao Liu commented on FLINK-14950:
--

Hi Jiayi,

It's an interesting ticket. Could you provide more details of your scenario?
For example,
1. Why do you need the key from context?
2. Could it be satisfied by extracting key field from element stored in window 
state?

> Support getKey in WindowOperator.Context
> 
>
> Key: FLINK-14950
> URL: https://issues.apache.org/jira/browse/FLINK-14950
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Affects Versions: 1.9.1
>Reporter: Jiayi Liao
>Priority: Major
>
> In our scenario, user needs to access the key of {{WindowOperator.Context}} 
> to determine how to deal with the window.
> I think it's reasonable to support {{getKey()}} method in 
> {{WindowOperator.Context}}. 
> cc [~aljoscha]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14344) A preparation for snapshotting master hook state asynchronously

2019-11-13 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16973128#comment-16973128
 ] 

Biao Liu commented on FLINK-14344:
--

[~pnowojski], makes sense. I have updated the title. The remaining part of 
"snapshot master hook state asynchronously" would be included in FLINK-13905.

> A preparation for snapshotting master hook state asynchronously
> ---
>
> Key: FLINK-14344
> URL: https://issues.apache.org/jira/browse/FLINK-14344
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Currently we snapshot the master hook state synchronously. As a part of 
> reworking threading model of {{CheckpointCoordinator}}, we have to make this 
> non-blocking to satisfy the requirement of running in main thread.
> The behavior of snapshotting master hook state is similar to task state 
> snapshotting. Master state snapshotting is taken before task state 
> snapshotting. Because in master hook, there might be external system 
> initialization which task state snapshotting might depend on.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14344) A preparation for snapshotting master hook state asynchronously

2019-11-13 Thread Biao Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-14344:
-
Summary: A preparation for snapshotting master hook state asynchronously  
(was: Snapshot master hook state asynchronously)

> A preparation for snapshotting master hook state asynchronously
> ---
>
> Key: FLINK-14344
> URL: https://issues.apache.org/jira/browse/FLINK-14344
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Currently we snapshot the master hook state synchronously. As a part of 
> reworking threading model of {{CheckpointCoordinator}}, we have to make this 
> non-blocking to satisfy the requirement of running in main thread.
> The behavior of snapshotting master hook state is similar to task state 
> snapshotting. Master state snapshotting is taken before task state 
> snapshotting. Because in master hook, there might be external system 
> initialization which task state snapshotting might depend on.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14344) Snapshot master hook state asynchronously

2019-11-06 Thread Biao Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-14344:
-
Release Note: Makes the semantic of 
MasterTriggerRestoreHook#triggerCheckpoint more clearer. It should be 
non-blocking, Any heavy operation should be executed asynchronously with given 
executor.  (was: Makes the semantic of 
{{MasterTriggerRestoreHook#triggerCheckpoint}} more clearer. It should be 
non-blocking, Any heavy operation should be executed asynchronously with given 
executor.)

> Snapshot master hook state asynchronously
> -
>
> Key: FLINK-14344
> URL: https://issues.apache.org/jira/browse/FLINK-14344
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently we snapshot the master hook state synchronously. As a part of 
> reworking threading model of {{CheckpointCoordinator}}, we have to make this 
> non-blocking to satisfy the requirement of running in main thread.
> The behavior of snapshotting master hook state is similar to task state 
> snapshotting. Master state snapshotting is taken before task state 
> snapshotting. Because in master hook, there might be external system 
> initialization which task state snapshotting might depend on.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14344) Snapshot master hook state asynchronously

2019-11-06 Thread Biao Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-14344:
-
Release Note: Makes the semantic of 
{{MasterTriggerRestoreHook#triggerCheckpoint}} more clearer. It should be 
non-blocking, Any heavy operation should be executed asynchronously with given 
executor.

> Snapshot master hook state asynchronously
> -
>
> Key: FLINK-14344
> URL: https://issues.apache.org/jira/browse/FLINK-14344
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently we snapshot the master hook state synchronously. As a part of 
> reworking threading model of {{CheckpointCoordinator}}, we have to make this 
> non-blocking to satisfy the requirement of running in main thread.
> The behavior of snapshotting master hook state is similar to task state 
> snapshotting. Master state snapshotting is taken before task state 
> snapshotting. Because in master hook, there might be external system 
> initialization which task state snapshotting might depend on.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14344) Snapshot master hook state asynchronously

2019-11-04 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16966566#comment-16966566
 ] 

Biao Liu commented on FLINK-14344:
--

[~pnowojski], 
Yes, BTW, I just resolved a tiny conflict with master branch.

> Snapshot master hook state asynchronously
> -
>
> Key: FLINK-14344
> URL: https://issues.apache.org/jira/browse/FLINK-14344
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently we snapshot the master hook state synchronously. As a part of 
> reworking threading model of {{CheckpointCoordinator}}, we have to make this 
> non-blocking to satisfy the requirement of running in main thread.
> The behavior of snapshotting master hook state is similar to task state 
> snapshotting. Master state snapshotting is taken before task state 
> snapshotting. Because in master hook, there might be external system 
> initialization which task state snapshotting might depend on.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14344) Snapshot master hook state asynchronously

2019-11-04 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16966521#comment-16966521
 ] 

Biao Liu commented on FLINK-14344:
--

[~pnowojski],
{quote}That you would prefer to keep the "2nd semantic", the one that's now 
explained in the java doc?{quote}
Yes, I would prefer 2nd semantic to keep compatibility.

> Snapshot master hook state asynchronously
> -
>
> Key: FLINK-14344
> URL: https://issues.apache.org/jira/browse/FLINK-14344
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently we snapshot the master hook state synchronously. As a part of 
> reworking threading model of {{CheckpointCoordinator}}, we have to make this 
> non-blocking to satisfy the requirement of running in main thread.
> The behavior of snapshotting master hook state is similar to task state 
> snapshotting. Master state snapshotting is taken before task state 
> snapshotting. Because in master hook, there might be external system 
> initialization which task state snapshotting might depend on.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14598) The use of FlinkUserCodeClassLoaders is unreasonable.

2019-11-04 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16966485#comment-16966485
 ] 

Biao Liu commented on FLINK-14598:
--

There is already a similar ticket (FLINK-13749) opened which you might be 
interested in.

> The use of FlinkUserCodeClassLoaders is unreasonable.
> -
>
> Key: FLINK-14598
> URL: https://issues.apache.org/jira/browse/FLINK-14598
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission, Runtime / Task, Table SQL / 
> Client
>Affects Versions: 1.9.1
>Reporter: luojiangyu
>Priority: Major
>
> The Client and Sql Client use the classloader that is generated by invoking 
> the method of FlinkUserCodeClassLoaders.parentFirst; and the runtime is use 
> the classloader that is generate by the configuration of 
> CLASSLOADER_RESOLVE_ORDER(the default value is child-first). The runtime and 
> client classloader generate way is different , and it is unreasonable. In the 
> production , it  brings bad influence.
> .



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-13905) Separate checkpoint triggering into stages

2019-11-01 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16964944#comment-16964944
 ] 

Biao Liu commented on FLINK-13905:
--

[~pnowojski],

{quote}If all things equal, I would prefer to postpone FLINK-13848 as much as 
possible, as that would be another chunk of code to review/maintain.{quote}
Fair enough. FLINK-13848 is quite independent. We could come back to it when we 
really need it.

I'll try to write the ideas down to see what it looks like. Maybe we will find 
a better solution based on that.

> Separate checkpoint triggering into stages
> --
>
> Key: FLINK-13905
> URL: https://issues.apache.org/jira/browse/FLINK-13905
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently {{CheckpointCoordinator#triggerCheckpoint}} includes some heavy IO 
> operations. We plan to separate the triggering into different stages. The IO 
> operations are executed in IO threads, while other on-memory operations are 
> not.
> This is a preparation for making all on-memory operations of 
> {{CheckpointCoordinator}} single threaded (in main thread).
> Note that we could not put on-memory operations of triggering into main 
> thread directly now. Because there are still some operations on a heavy lock 
> (coordinator-wide).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14344) Snapshot master hook state asynchronously

2019-11-01 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16964939#comment-16964939
 ] 

Biao Liu commented on FLINK-14344:
--

Hi [~pnowojski],
{quote}this is `PublicEvolving` class, so we are allowed to change it.
{quote}
Yes we could. I'm just not sure if it's designed to support feature like this, 
initializing an external system before snapshotting task state. Maybe the 
original author [~sewen] could give us more background informations?

The Peavega connector maintainer gave me a response that my guess is right. 
They executes the `triggerCheckpoint` asynchronously. I have left another 
message to ask them if they depend on the sequential order of triggering master 
hook and snapshotting task state.
{quote}how difficult would it be to provide the 2nd semantic? Would dropping 
the synchronous hooks support and providing just the 1st semantic be 
significantly easier to write/support/maintain?
{quote}
Based on my POC, it's not much harder to provide the 2nd semantic from the 
perspective of implementation.

If it's a kind of "pure" master state, not master hook of 
{{ExternallyInducedSource}}, I think I'll choose the 1st semantic. There should 
be no dependence between master state and task state. But the semantic of 
master hook is somewhat special, you know, I tend to leave it alone (at-least 
before we have thought about it clearly). Of course, we'd better wait a bit for 
the feedback of Pervega connector maintainers.

> Snapshot master hook state asynchronously
> -
>
> Key: FLINK-14344
> URL: https://issues.apache.org/jira/browse/FLINK-14344
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently we snapshot the master hook state synchronously. As a part of 
> reworking threading model of {{CheckpointCoordinator}}, we have to make this 
> non-blocking to satisfy the requirement of running in main thread.
> The behavior of snapshotting master hook state is similar to task state 
> snapshotting. Master state snapshotting is taken before task state 
> snapshotting. Because in master hook, there might be external system 
> initialization which task state snapshotting might depend on.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-13905) Separate checkpoint triggering into stages

2019-11-01 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16964855#comment-16964855
 ] 

Biao Liu commented on FLINK-13905:
--

Hi [~pnowojski],
{quote}So the periodic trigger would, if there is an ongoing chain of A->B->C, 
will just enque a request in this queue, otherwise it would trigger "A". Then 
we also need a manual logic in A, B and C, that if they fail, we re-check the 
queue or if "C" completes successfully, it also rechecks the queue?{quote}
Yes, exactly. There will be a re-checking when a trigger is finished, no matter 
it's successful or failed.

{quote}Isn't it almost the same logic as scheduling the next checkpoint with a 
delay manually from A, B or C? Without the need for FLINK-13848? {quote}
Yes, if checkpoint is manually triggered, we don't need FLINK-13848 and the 
queue mentioned for the periodic triggering. But there is one thing blocks this 
approach, the savepoint. The savepoint can be triggered anytime. We have to 
somehow queue the savepoint trigger request if there is a checkpoint or 
savepoint ongoing. The queuing and re-checking logic still can't be avoided. 
The manually triggering seems to be less meaning. 

{quote}Side note, haven't you implemented something similar or exactly this in 
one of the PRs, in a commit that was ultimately dropped?{quote}
Not yet, there is just a POC, I postponed the PR. I think it's better to have 
FLINK-14344 first. After all, master hook triggering is a part of 
{{triggerCheckpoint}}. 

{quote}In the end, what do you think would be an easier/cleaner/better approach 
to solve this?{quote}
I wish I have a perfect one... It seems that making the whole workflow 
asynchronous sometimes complicates the implementation.
BTW, I have an idea that encapsulating the checkpoint lifecycle in finite state 
machine model. When the state transits from {{TRIGGERING}} to {{SNAPSHOTTING}} 
(normally) or {{FAILED}} (exceptionally), it re-checks the queue. In this way, 
there will be few entrances to do this re-checking. The codes might be easier 
to read or maintain. It might alleviate the pain but actually the approach is 
not simplified.

Do you have any better idea?


> Separate checkpoint triggering into stages
> --
>
> Key: FLINK-13905
> URL: https://issues.apache.org/jira/browse/FLINK-13905
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently {{CheckpointCoordinator#triggerCheckpoint}} includes some heavy IO 
> operations. We plan to separate the triggering into different stages. The IO 
> operations are executed in IO threads, while other on-memory operations are 
> not.
> This is a preparation for making all on-memory operations of 
> {{CheckpointCoordinator}} single threaded (in main thread).
> Note that we could not put on-memory operations of triggering into main 
> thread directly now. Because there are still some operations on a heavy lock 
> (coordinator-wide).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-13861) No new checkpoint will be trigged when canceling an expired checkpoint failed

2019-10-29 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16962091#comment-16962091
 ] 

Biao Liu commented on FLINK-13861:
--

Thanks for reporting [~klion26]! It's definitely a critical issue.

I agree with [~pnowojski] that we should not handle this kind of exception. 
There might be a resource leaking or other unexpected scenario (like skipping 
the {{triggerQueuedRequests}}) if we tolerate the exception. A big try catch 
wrapping this canceller might be a good choice. Any statement of the 
cancellation should not cause an exception here, not only the 
{{failPendingCheckpoint}}. 

I'm also interested in the root cause of this issue. I would check the relevant 
codes later to search for clues.

Regarding to the potential conflict, I could take care of that :) So please do 
the fixing if you would like to, I could help reviewing if you need.

> No new checkpoint will be trigged when canceling an expired checkpoint failed
> -
>
> Key: FLINK-13861
> URL: https://issues.apache.org/jira/browse/FLINK-13861
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.7.2, 1.8.1, 1.9.0
>Reporter: Congxian Qiu(klion26)
>Priority: Major
> Fix For: 1.10.0
>
>
> I encountered this problem in our private fork of Flink, after taking a look 
> at the current master branch of Apache Flink, I think the problem exists here 
> also.
> Problem Detail:
>  1. checkpoint canceled because of expiration, so will call the canceller 
> such as below
> {code:java}
> final Runnable canceller = () -> {
>synchronized (lock) {
>   // only do the work if the checkpoint is not discarded anyways
>   // note that checkpoint completion discards the pending checkpoint 
> object
>   if (!checkpoint.isDiscarded()) {
>  LOG.info("Checkpoint {} of job {} expired before completing.", 
> checkpointID, job);
>  failPendingCheckpoint(checkpoint, 
> CheckpointFailureReason.CHECKPOINT_EXPIRED);
>  pendingCheckpoints.remove(checkpointID);
>  rememberRecentCheckpointId(checkpointID);
>  triggerQueuedRequests();
>   }
>}
> };{code}
>  
>  But failPendingCheckpoint may throw exceptions because it will call
> {{CheckpointCoordinator#failPendingCheckpoint}}
> -> {{PendingCheckpoint#abort}}
> ->  {{PendingCheckpoint#reportFailedCheckpoint}}
> -> initialize a FailedCheckpointStates,  may throw an exception by 
> {{checkArgument}} 
> Did not find more about why there ever failed the {{checkArgument 
> currently(this problem did not reproduce frequently)}}, will create an issue 
> for that if I have more findings.
>  
> 2. when trigger checkpoint next, we'll first check if there already are too 
> many checkpoints such as below
> {code:java}
> private void checkConcurrentCheckpoints() throws CheckpointException {
>if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
>   triggerRequestQueued = true;
>   if (currentPeriodicTrigger != null) {
>  currentPeriodicTrigger.cancel(false);
>  currentPeriodicTrigger = null;
>   }
>   throw new 
> CheckpointException(CheckpointFailureReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
>}
> }
> {code}
> the {{pendingCheckpoints.zie() >= maxConcurrentCheckpoitnAttempts}} will 
> always true
> 3. no checkpoint will be triggered ever from that on.
>  Because of the {{failPendingCheckpoint}} may throw Exception, so we may 
> place the remove pending checkpoint logic in a finally block.
> I'd like to file a pr for this if this really needs to fix.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-13848) Support “scheduleAtFixedRate/scheduleAtFixedDelay” in RpcEndpoint#MainThreadExecutor

2019-10-29 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16961732#comment-16961732
 ] 

Biao Liu commented on FLINK-13848:
--

{quote}> Anyway it's no harm to enrich the MainThreadExecutor.

Kind of there is - having to maintain unused feature.{quote}

Strictly speaking, it's true :)
The good news is that the periodic scheduling methods is already introduced in 
`MainThreadExecutor`. It's just not implemented yet. So it's kind of I'm 
implementing something already in a plan.

> Support “scheduleAtFixedRate/scheduleAtFixedDelay” in 
> RpcEndpoint#MainThreadExecutor
> 
>
> Key: FLINK-13848
> URL: https://issues.apache.org/jira/browse/FLINK-13848
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently the methods “scheduleAtFixedRate/scheduleAtFixedDelay" of 
> {{RpcEndpoint#MainThreadExecutor}} are not implemented. Because there was no 
> requirement on them before.
> Now we are planning to implement these methods to support periodic checkpoint 
> triggering.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14344) Snapshot master hook state asynchronously

2019-10-28 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16961729#comment-16961729
 ] 

Biao Liu commented on FLINK-14344:
--

[~pnowojski], thanks a lot for feedback.

{quote}After the refactor, it would block whole JobManager, right?{quote}
Yes, you are right. It might cause a performance issue if the hook executes a 
time-consuming operation.

I have taken a look at the Pravega connector codes. The current implementation 
of  {{MasterTriggerRestoreHook.triggerCheckpoint}} is executed asynchronously 
with an internal {{ScheduledExecutorService}}, not the IO executor provided. 
It's acceptable for us. I have left a message to the guys of this project [1]. 
I hope they could give me a feedback.

Regarding to the "two possible semantics". I'm afraid we have to choose option 
2 due to compatibility. There is a comment of 
{{MasterTriggerRestoreHook.triggerCheckpoint}}:
{quote}This method is called by the checkpoint coordinator prior when 
triggering a checkpoint, prior to sending the "trigger checkpoint" messages to 
the source tasks{quote}

[1] https://github.com/pravega/flink-connectors/issues/287

> Snapshot master hook state asynchronously
> -
>
> Key: FLINK-14344
> URL: https://issues.apache.org/jira/browse/FLINK-14344
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently we snapshot the master hook state synchronously. As a part of 
> reworking threading model of {{CheckpointCoordinator}}, we have to make this 
> non-blocking to satisfy the requirement of running in main thread.
> The behavior of snapshotting master hook state is similar to task state 
> snapshotting. Master state snapshotting is taken before task state 
> snapshotting. Because in master hook, there might be external system 
> initialization which task state snapshotting might depend on.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-13848) Support “scheduleAtFixedRate/scheduleAtFixedDelay” in RpcEndpoint#MainThreadExecutor

2019-10-28 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16961152#comment-16961152
 ] 

Biao Liu commented on FLINK-13848:
--

[~pnowojski],
{quote}Do I understand it correctly, that in the current code base this is not 
an issue, because all of the A, B and C are executed synchronously from one 
method?
{quote}
Yes, you are right, it's not an issue based on current implementation. But 
it'll be an issue if we make the {{triggerCheckpoint}} asynchronously.
{quote}have you thought how to resolve the issue from you previous comment 
(posted on 30/Aug/19 04:52)? With triggering next "A" only after the previous 
"C" completes?
{quote}
That's a good question. I would like to talk about it, although I believe it's 
an orthogonal issue. It indeed annoys me and makes the implementation more 
complicated.

First of all, we have to guarantee the workflow is "A -> B -> C -> A ...", 
because the checkpoint id sent to task should be monotonically increasing. 
There are two kinds of triggering logic, the periodic checkpoint and savepoint. 
We could avoid competition between periodic checkpoints by triggering 
checkpoint manually or failing the later one. But we can't avoid competition 
between savepoint and checkpoint anyway. The savepoint could be triggered 
anytime and it can't be failed.

In brief, my solution is introducing a queue of trigger request. If the prior 
trigger request is not finished, the later one (including checkpoint and 
savepoint) will be kept in this queue. The queue will be checked when the prior 
trigger request is finished. It would be included in 
https://issues.apache.org/jira/browse/FLINK-13905.

> Support “scheduleAtFixedRate/scheduleAtFixedDelay” in 
> RpcEndpoint#MainThreadExecutor
> 
>
> Key: FLINK-13848
> URL: https://issues.apache.org/jira/browse/FLINK-13848
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently the methods “scheduleAtFixedRate/scheduleAtFixedDelay" of 
> {{RpcEndpoint#MainThreadExecutor}} are not implemented. Because there was no 
> requirement on them before.
> Now we are planning to implement these methods to support periodic checkpoint 
> triggering.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14308) Kafka09ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceCustomOperator fails on Travis

2019-10-21 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16956600#comment-16956600
 ] 

Biao Liu commented on FLINK-14308:
--

Another instance, https://travis-ci.com/flink-ci/flink/jobs/247867376

> Kafka09ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceCustomOperator
>  fails on Travis
> -
>
> Key: FLINK-14308
> URL: https://issues.apache.org/jira/browse/FLINK-14308
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.10.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.10.0
>
>
> The 
> {{Kafka09ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceCustomOperator}}
>  fails on Travis with
> {code}
> Test 
> testOneToOneAtLeastOnceCustomOperator(org.apache.flink.streaming.connectors.kafka.Kafka09ProducerITCase)
>  failed with:
> java.lang.AssertionError: Expected to contain all of: <[0]>, but was: <[]>
>   at org.junit.Assert.fail(Assert.java:88)
>   at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestBase.assertAtLeastOnceForTopic(KafkaTestBase.java:235)
>   at 
> org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase.testOneToOneAtLeastOnce(KafkaProducerTestBase.java:289)
>   at 
> org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase.testOneToOneAtLeastOnceCustomOperator(KafkaProducerTestBase.java:214)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
> {code}
> https://api.travis-ci.com/v3/job/240747188/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-13848) Support “scheduleAtFixedRate/scheduleAtFixedDelay” in RpcEndpoint#MainThreadExecutor

2019-10-21 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16956009#comment-16956009
 ] 

Biao Liu commented on FLINK-13848:
--

I would like to activate this ticket.

There is a discussion that whether it's a good way to schedule trigger manually 
of {{CheckpointCoordinator}} (in other PR). We probably still need periodic 
scheduling in main thread.

Anyway it's no harm to enrich the {{MainThreadExecutor}}.

> Support “scheduleAtFixedRate/scheduleAtFixedDelay” in 
> RpcEndpoint#MainThreadExecutor
> 
>
> Key: FLINK-13848
> URL: https://issues.apache.org/jira/browse/FLINK-13848
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently the methods “scheduleAtFixedRate/scheduleAtFixedDelay" of 
> {{RpcEndpoint#MainThreadExecutor}} are not implemented. Because there was no 
> requirement on them before.
> Now we are planning to implement these methods to support periodic checkpoint 
> triggering.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14344) Snapshot master hook state asynchronously

2019-10-14 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16950976#comment-16950976
 ] 

Biao Liu commented on FLINK-14344:
--

Hi [~pnowojski],
{quote}I think ideally I would prefer to make a contract that sync master hooks 
should be non blocking executed in the main thread. Async hooks could also be 
executed by the main thread and user should take care of spawning/re-using his 
own thread to actually execute the async work (just as in AsyncWaitOperator). 
If user executes blocking code, let him shoot himself in the foot...{quote}
It make sense to me. So there are three options now. 
1. {{MasterState syncSnapshotHook(...)}}
2. {{CompletableFuture asyncSnapshotHook(ioExecutor)}}
3. {{CompletableFuture asyncSnapshotHook()}}

The only difference between option 2 and 3 is whether we provide an IO executor 
or not. I tend to choose option 2. But option 3 is also acceptable to me.
{quote}unless we schedule periodic actions always in some separate thread, 
first think we do is to execute the hooks in that thread, and only after 
execute the hooks, we enqueue follow up work in the main thread?{quote}
I think we should schedule periodic actions in main thread as planned. That's 
one of the biggest targets of this reworking, making it single-threaded and 
lock free(the trigger/coordinator-wide lock).
{quote}...we would have to make sure that none of the CheckpointCoordinator 
actions will be triggered until that other thread finish its work.{quote}
I think we have to face this problem. The master hook is by designed to do some 
IO operation. And we can't wait for the result synchronously. It should be done 
with {{CompletableFuture}} and main thread executor. 

> Snapshot master hook state asynchronously
> -
>
> Key: FLINK-14344
> URL: https://issues.apache.org/jira/browse/FLINK-14344
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently we snapshot the master hook state synchronously. As a part of 
> reworking threading model of {{CheckpointCoordinator}}, we have to make this 
> non-blocking to satisfy the requirement of running in main thread.
> The behavior of snapshotting master hook state is similar to task state 
> snapshotting. Master state snapshotting is taken before task state 
> snapshotting. Because in master hook, there might be external system 
> initialization which task state snapshotting might depend on.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14344) Snapshot master hook state asynchronously

2019-10-11 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16949578#comment-16949578
 ] 

Biao Liu commented on FLINK-14344:
--

Hi [~pnowojski], thanks for feedback.

Regarding to the current code, yes, you are right. It confuses me as well. 
There is a comment before the waiting part of 
{{MasterHooks#triggerMasterHooks}}. It says "in the future we want to make this 
asynchronous with futures (no pun intended)". I think it means it should be 
asynchronous, but it's just not to be done for some reason. So I guess it 
should be asynchronous by design.
{quote}I'm not sure why should we execute the hooks in the IO Executor, why not 
master thread?
{quote}
Actually at the very beginning, I think it should be executed in main thread. 
However I found there is a comment of 
{{MasterTriggerRestoreHook#triggerCheckpoint}}, "If the action by this hook 
needs to be executed synchronously, then this method should directly execute 
the action synchronously and block until it is complete". Based on this 
description, I'm afraid there might be a risk of executing a blocking operation 
in main thread. So I try to execute it in IO thread, but there is another risk 
of deadlock.

It's hard to make a right decision without a clear semantics. I have started a 
survey in both dev and user mailing list to find out how develops and users use 
this interface. There is no response yet. I guess there are not too many users 
depending on it.

If there is no opposite opinion in the next few days, I intend to treat it 
asynchronous, executing it in main thread. But the comment needs to be changed, 
removing the comment of "blocking until it is complete" and emphasizing in 
comment that it should be non-blocking.

If there are some opposite voices, and we couldn't reach an agreement. Then we 
could make it synchronous by changing the signature of method (removing the 
executor and completable future).

What do you think?

> Snapshot master hook state asynchronously
> -
>
> Key: FLINK-14344
> URL: https://issues.apache.org/jira/browse/FLINK-14344
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently we snapshot the master hook state synchronously. As a part of 
> reworking threading model of {{CheckpointCoordinator}}, we have to make this 
> non-blocking to satisfy the requirement of running in main thread.
> The behavior of snapshotting master hook state is similar to task state 
> snapshotting. Master state snapshotting is taken before task state 
> snapshotting. Because in master hook, there might be external system 
> initialization which task state snapshotting might depend on.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14344) Snapshot master hook state asynchronously

2019-10-09 Thread Biao Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-14344:
-
Description: 
Currently we snapshot the master hook state synchronously. As a part of 
reworking threading model of {{CheckpointCoordinator}}, we have to make this 
non-blocking to satisfy the requirement of running in main thread.

The behavior of snapshotting master hook state is similar to task state 
snapshotting. Master state snapshotting is taken before task state 
snapshotting. Because in master hook, there might be external system 
initialization which task state snapshotting might depend on.

  was:
Currently we snapshot the master hook state synchronously. As a part of 
reworking threading model of {{CheckpointCoordinator}}, we have to make this 
non-blocking to satisfy the requirement of running in main thread.

The behavior of snapshotting master hook state is similar to task state 
snapshotting. Master state snapshotting is taken before task state 
snapshotting. Because in master hook, there might be external system 
initialization which task state snapshotting might depends on.



> Snapshot master hook state asynchronously
> -
>
> Key: FLINK-14344
> URL: https://issues.apache.org/jira/browse/FLINK-14344
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing
>Reporter: Biao Liu
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently we snapshot the master hook state synchronously. As a part of 
> reworking threading model of {{CheckpointCoordinator}}, we have to make this 
> non-blocking to satisfy the requirement of running in main thread.
> The behavior of snapshotting master hook state is similar to task state 
> snapshotting. Master state snapshotting is taken before task state 
> snapshotting. Because in master hook, there might be external system 
> initialization which task state snapshotting might depend on.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14344) Snapshot master hook state asynchronously

2019-10-09 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16947662#comment-16947662
 ] 

Biao Liu commented on FLINK-14344:
--

When I start implementing this part, I just realized there is one more thing 
need to discuss. I would like to make the semantics of 
{{MasterTriggerRestoreHook#triggerCheckpoint}} clearer.

At the moment {{MasterTriggerRestoreHook}} is not a public official interface 
(at-least from perspective of codes). I'm not sure how big the side-effect 
might be if I changed the interface or behavior.

The problem is that now the interface says (described in Java doc) both 
synchronous and asynchronous implementation are OK. This behavior makes things 
more complex. I can't figure out a proper asynchronous way to handle both of 
these two scenarios at the same time.

I was planning to execute {{MasterTriggerRestoreHook#triggerCheckpoint}} in IO 
executor directly. It's OK for the synchronous scenario. But I found there 
might be a deadlock scenario under asynchronous scenario. Currently the IO 
executor is given to master hook as an input parameter. If we execute it in IO 
executor, and the implementation of 
{{MasterTriggerRestoreHook#triggerCheckpoint}} launches an asynchronous 
operation then waits for the result for some reason. It might be deadlock if 
this asynchronous operation is scheduled in the same IO thread. The waiting 
blocks the later asynchronous operation, it can't finish.

IMO synchronous and asynchronous interfaces should be different, and be treated 
in different ways.
 # The synchronous invocation returns value directly, not a completable future. 
The method is executed in IO thread under a proper lock (could be the hook 
itself). It could be launched by {{CheckpointCoordinator}} directly. The 
implementation of {{MasterTriggerRestoreHook}} could not see the IO executor at 
all. However in this way, we break the compatibility.
 # The asynchronous invocation returns a completable future just like the 
current. The {{MasterTriggerRestoreHook#triggerCheckpoint}} method itself is 
without any IO operation. All heavy IO operations are executed in the IO 
executor which is given to master hook as an input parameter. There is also an 
advantage of this asynchronous interface. We could avoid competition on all 
methods of {{MasterTriggerRestoreHook}} (run in main thread) except the real 
asynchronous part (user must guarantee it is thread-safe or under a proper 
lock) executed in IO thread. In this way, we keep the compatibility on the 
surface. However we change the behavior somewhat. We could emphasize the change 
in Java doc and release note.

If nobody could make sure the side-effect. I could start a survey or a 
discussion in mailing list. What do you think? Any feedback is appreciated.
 cc [~sewen], [~trohrmann], [~pnowojski]

> Snapshot master hook state asynchronously
> -
>
> Key: FLINK-14344
> URL: https://issues.apache.org/jira/browse/FLINK-14344
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing
>Reporter: Biao Liu
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently we snapshot the master hook state synchronously. As a part of 
> reworking threading model of {{CheckpointCoordinator}}, we have to make this 
> non-blocking to satisfy the requirement of running in main thread.
> The behavior of snapshotting master hook state is similar to task state 
> snapshotting. Master state snapshotting is taken before task state 
> snapshotting. Because in master hook, there might be external system 
> initialization which task state snapshotting might depends on.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14344) Snapshot master hook state asynchronously

2019-10-09 Thread Biao Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-14344:
-
Description: 
Currently we snapshot the master hook state synchronously. As a part of 
reworking threading model of {{CheckpointCoordinator}}, we have to make this 
non-blocking to satisfy the requirement of running in main thread.

The behavior of snapshotting master hook state is similar to task state 
snapshotting. Master state snapshotting is taken before task state 
snapshotting. Because in master hook, there might be external system 
initialization which task state snapshotting might depends on.


  was:
Currently we snapshot the master hook state synchronously. As a part of 
reworking threading model of {{CheckpointCoordinator}}, we have to make this 
non-blocking to satisfy the requirement of running in main thread.

The behavior of snapshotting master hook state should be similar to task state 
snapshotting. It should be launched after \{{PendingCheckpoint}} created. It 
could complete or fail the {{PendingCheckpoint}} like task state snapshotting. 


> Snapshot master hook state asynchronously
> -
>
> Key: FLINK-14344
> URL: https://issues.apache.org/jira/browse/FLINK-14344
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing
>Reporter: Biao Liu
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently we snapshot the master hook state synchronously. As a part of 
> reworking threading model of {{CheckpointCoordinator}}, we have to make this 
> non-blocking to satisfy the requirement of running in main thread.
> The behavior of snapshotting master hook state is similar to task state 
> snapshotting. Master state snapshotting is taken before task state 
> snapshotting. Because in master hook, there might be external system 
> initialization which task state snapshotting might depends on.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   3   >