[jira] [Commented] (FLINK-13848) Support “scheduleAtFixedRate/scheduleAtFixedDelay” in RpcEndpoint#MainThreadExecutor

2019-08-29 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16918535#comment-16918535
 ] 

Biao Liu commented on FLINK-13848:
--

Hi [~till.rohrmann],
 I just found this issue could not satisfy our requirement. In the discussion 
on design doc, we want to avoid competition between different triggers. Our 
target is making triggers processed one by one. {{"scheduleAtFixedDelay"}} 
seems to be the way we want. The {{"scheduleAtFixedDelay"}} schedules 
{{runnable}} one after the other. But here the {{runnable}} of triggering is 
asynchronous. The {{runnable}} is finished does not mean the triggering is also 
finished.

I think we have to go back to the original plan, doing the triggering manually 
(triggering the next one when the future of prior trigger is done), not through 
{{MainThreadExecutor#scheduleAtFixedDelay}}.

> Support “scheduleAtFixedRate/scheduleAtFixedDelay” in 
> RpcEndpoint#MainThreadExecutor
> 
>
> Key: FLINK-13848
> URL: https://issues.apache.org/jira/browse/FLINK-13848
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Biao Liu
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently the methods “scheduleAtFixedRate/scheduleAtFixedDelay" of 
> {{RpcEndpoint#MainThreadExecutor}} are not implemented. Because there was no 
> requirement on them before.
> Now we are planning to implement these methods to support periodic checkpoint 
> triggering.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-13848) Support “scheduleAtFixedRate/scheduleAtFixedDelay” in RpcEndpoint#MainThreadExecutor

2019-08-29 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16919138#comment-16919138
 ] 

Biao Liu commented on FLINK-13848:
--

[~till.rohrmann]

Sorry I didn't describe it clearly.

In this refactoring, the {{triggerCheckpoint}} would be separated into several 
stages. For example A -> B -> C. A is "start triggering", running in main 
thread. B is doing some heavy initializations, running in IO threads. C is 
sending the trigger message to tasks, running in main thread. The workflow 
should be like:
 # First A is triggered and executed in main thread. A launches the B into IO 
threads, and returns immediately.
 # B is executed in IO thread. When it's finished, a callback of triggering C 
executes (maybe through {{CompletableFuture#thenRunAsync}} with main thread 
executor).
 # C is executed in main thread. After then the next A is scheduled after a 
delay.

It's A -> B -> C -> A ... There would be no competition. That's what we really 
want.

But for the {{scheduleAtFixedDelay}} way, what we scheduled here is just A. The 
next A would be scheduled when the prior A is finished.
 It's A -> B -> C
         \
           -> A -> B -> C

So I think we should abandon this ticket. Or convert it to a normal issue not a 
subtask of this refactoring. What do you think?

> Support “scheduleAtFixedRate/scheduleAtFixedDelay” in 
> RpcEndpoint#MainThreadExecutor
> 
>
> Key: FLINK-13848
> URL: https://issues.apache.org/jira/browse/FLINK-13848
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Biao Liu
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently the methods “scheduleAtFixedRate/scheduleAtFixedDelay" of 
> {{RpcEndpoint#MainThreadExecutor}} are not implemented. Because there was no 
> requirement on them before.
> Now we are planning to implement these methods to support periodic checkpoint 
> triggering.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13904) Avoid competition between different rounds of checkpoint triggering

2019-08-29 Thread Biao Liu (Jira)
Biao Liu created FLINK-13904:


 Summary: Avoid competition between different rounds of checkpoint 
triggering
 Key: FLINK-13904
 URL: https://issues.apache.org/jira/browse/FLINK-13904
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Biao Liu
 Fix For: 1.10.0


As a part of {{CheckpointCoordinator}} refactoring, I'd like to simplify the 
concurrent triggering logic.
The different rounds of checkpoint triggering would be processed sequentially. 
The final target is getting rid of timer thread and {{triggerLock}}.

Note that we can't avoid all competitions of triggering for now. There is still 
a competition between normal checkpoint triggering and savepoint triggering. We 
could avoid this competition by executing triggering in main thread. But it 
could not be achieved until all blocking operations are handled well in IO 
threads.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13905) Separate checkpoint triggering into stages

2019-08-29 Thread Biao Liu (Jira)
Biao Liu created FLINK-13905:


 Summary: Separate checkpoint triggering into stages
 Key: FLINK-13905
 URL: https://issues.apache.org/jira/browse/FLINK-13905
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Biao Liu
 Fix For: 1.10.0


Currently {{CheckpointCoordinator#triggerCheckpoint}} includes some heavy IO 
operations. We plan to separate the triggering into different stages. The IO 
operations are executed in IO threads, while other on-memory operations are not.

This is a preparation for making all on-memory operations of 
{{CheckpointCoordinator}} single threaded (in main thread).
Note that we could not put on-memory operations of triggering into main thread 
directly now. Because there are still some operations on a heavy lock 
(coordinator-wide).



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-12164) JobMasterTest.testJobFailureWhenTaskExecutorHeartbeatTimeout is unstable

2019-08-30 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-12164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16919314#comment-16919314
 ] 

Biao Liu commented on FLINK-12164:
--

Hi [~aljoscha],
Sorry I forget to update the progress. I'm not working on it for now. I have 
abandoned my prior PR. I was planning to implement a better one (not mocking so 
much) however recently I didn't find time to do so :(
If it becomes a blocker, I could postpone other things and get back to this 
issue first.


> JobMasterTest.testJobFailureWhenTaskExecutorHeartbeatTimeout is unstable
> 
>
> Key: FLINK-12164
> URL: https://issues.apache.org/jira/browse/FLINK-12164
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: Aljoscha Krettek
>Assignee: Biao Liu
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.10.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> {code}
> 07:28:23.957 [ERROR] Tests run: 24, Failures: 0, Errors: 1, Skipped: 0, Time 
> elapsed: 8.968 s <<< FAILURE! - in 
> org.apache.flink.runtime.jobmaster.JobMasterTest
> 07:28:23.957 [ERROR] 
> testJobFailureWhenTaskExecutorHeartbeatTimeout(org.apache.flink.runtime.jobmaster.JobMasterTest)
>   Time elapsed: 0.177 s  <<< ERROR!
> java.util.concurrent.ExecutionException: java.lang.Exception: Unknown 
> TaskManager 69a7c8c18a36069ff90a1eae8ec41066
>   at 
> org.apache.flink.runtime.jobmaster.JobMasterTest.registerSlotsAtJobMaster(JobMasterTest.java:1746)
>   at 
> org.apache.flink.runtime.jobmaster.JobMasterTest.runJobFailureWhenTaskExecutorTerminatesTest(JobMasterTest.java:1670)
>   at 
> org.apache.flink.runtime.jobmaster.JobMasterTest.testJobFailureWhenTaskExecutorHeartbeatTimeout(JobMasterTest.java:1630)
> Caused by: java.lang.Exception: Unknown TaskManager 
> 69a7c8c18a36069ff90a1eae8ec41066
> {code}



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-12164) JobMasterTest.testJobFailureWhenTaskExecutorHeartbeatTimeout is unstable

2019-08-30 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-12164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16919496#comment-16919496
 ] 

Biao Liu commented on FLINK-12164:
--

I just finished the PR.
[~till.rohrmann], could you take a look at it?

> JobMasterTest.testJobFailureWhenTaskExecutorHeartbeatTimeout is unstable
> 
>
> Key: FLINK-12164
> URL: https://issues.apache.org/jira/browse/FLINK-12164
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: Aljoscha Krettek
>Assignee: Biao Liu
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.10.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> {code}
> 07:28:23.957 [ERROR] Tests run: 24, Failures: 0, Errors: 1, Skipped: 0, Time 
> elapsed: 8.968 s <<< FAILURE! - in 
> org.apache.flink.runtime.jobmaster.JobMasterTest
> 07:28:23.957 [ERROR] 
> testJobFailureWhenTaskExecutorHeartbeatTimeout(org.apache.flink.runtime.jobmaster.JobMasterTest)
>   Time elapsed: 0.177 s  <<< ERROR!
> java.util.concurrent.ExecutionException: java.lang.Exception: Unknown 
> TaskManager 69a7c8c18a36069ff90a1eae8ec41066
>   at 
> org.apache.flink.runtime.jobmaster.JobMasterTest.registerSlotsAtJobMaster(JobMasterTest.java:1746)
>   at 
> org.apache.flink.runtime.jobmaster.JobMasterTest.runJobFailureWhenTaskExecutorTerminatesTest(JobMasterTest.java:1670)
>   at 
> org.apache.flink.runtime.jobmaster.JobMasterTest.testJobFailureWhenTaskExecutorHeartbeatTimeout(JobMasterTest.java:1630)
> Caused by: java.lang.Exception: Unknown TaskManager 
> 69a7c8c18a36069ff90a1eae8ec41066
> {code}



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-4399) Add support for oversized messages

2019-09-02 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-4399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16920867#comment-16920867
 ] 

Biao Liu commented on FLINK-4399:
-

Hi [~till.rohrmann], thanks for feedback!

{quote}Instead, it could be enough to add some utilities to offload large 
payloads to the BlobServer and then handle all message which can carry a large 
user code payload on a higher level as we do with the 
TaskDeploymentDescriptor.{quote}
I can't agree more. I don't think using RPC system to handle large message is 
the best practice. Through {{BlobServer}} outside RPC system is a good choice 
to me. I have described the reason detailedly in doc.

{quote}As I've written in the Google doc, I assume that we would also define an 
upper bound for the message size because otherwise it can easily cause OOM 
errors.{quote}
I think you have raised a significant question. Does this upper bound should 
exist? 

1. The memory issue, for example the risk of OOM. If the message needs to be 
serialized/deserialized in memory, the risk of OOM can't be avoided even we 
transfer it through {{BlobServer}}. Because we always have to allocated a big 
buffer to do the serialization/deserialization. So I think the key point here 
is whether we serialize/deserialize the large message in memory or not.
2. Could we cover all risks of large message through {{BlobServer}}? I guess 
it's hard to do so. For example, accumulator, metrics, source split, usually 
they are quite small, far away from the upper limit. However it could be large. 
It depends on the implementation of user code. If we use {{BlobServer}} to 
transfer all these messages, the performance might regress a lot for the 
general cases. And it's also hard to judge whether it's large or not without 
serializing it.

To sum up, I guess the upper bound of RPC message is not so important as we 
imaged. Or a very large upper bound (maybe 100M+) for protection is meaningful. 
And we can't get rid of all risks through {{BlobServer}} outside RPC system.

So IMO the best practice of handling large message is:
1. Using {{BlobServer}} outside RPC system to handle probably large message, 
like {{JobGraph}} and {{TaskDeploymentDescriptor}}. Adding some common 
utilities is a good idea.
2. RPC system could support large message without upper bound or with a very 
large upper bound. The performance might not be good. But it's a performance 
issue which user should tune. Splitting the large message into chunks or giving 
a larger RPC size limit are both acceptable to me. Splitting message is more 
friendly to other small messages queuing in the RPC system, while giving a 
large size limit does not need any development.

 

> Add support for oversized messages
> --
>
> Key: FLINK-4399
> URL: https://issues.apache.org/jira/browse/FLINK-4399
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
> Environment: FLIP-6 feature branch
>Reporter: Stephan Ewen
>Assignee: Biao Liu
>Priority: Major
>  Labels: flip-6
>
> Currently, messages larger than the maximum Akka Framesize cause an error 
> when being transported. We should add a way to pass messages that are larger 
> than the Framesize, as may happen for:
>   - {{collect()}} calls that collect large data sets (via accumulators)
>   - Job submissions and operator deployments where the functions closures are 
> large (for example because it contains large pre-loaded data)
>   - Function restore in cases where restored state is larger than 
> checkpointed state (union state)
> I suggest to use the {{BlobManager}} to transfer large payload.
>   - On the sender side, oversized messages are stored under a transient blob 
> (which is deleted after first retrieval, or after a certain number of minutes)
>   - The sender sends a "pointer to blob message" instead.
>   - The receiver grabs the message from the blob upon receiving the pointer 
> message
> The RPC Service should be optionally initializable with a "large message 
> handler" which is internally the {{BlobManager}}.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-4399) Add support for oversized messages

2019-09-03 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-4399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16921901#comment-16921901
 ] 

Biao Liu commented on FLINK-4399:
-

Hi [~till.rohrmann], I agree it's a critical issue.
I understand there are tons of work in this release cycle. I really appreciate 
that you could squeeze time for reviewing & discussing the design.
I don't think this improvement must be included in 1.10. We could give it a low 
priority. Maybe get back to this issue after finishing the main work of 1.10? I 
could pin you then.

> Add support for oversized messages
> --
>
> Key: FLINK-4399
> URL: https://issues.apache.org/jira/browse/FLINK-4399
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
> Environment: FLIP-6 feature branch
>Reporter: Stephan Ewen
>Assignee: Biao Liu
>Priority: Major
>  Labels: flip-6
>
> Currently, messages larger than the maximum Akka Framesize cause an error 
> when being transported. We should add a way to pass messages that are larger 
> than the Framesize, as may happen for:
>   - {{collect()}} calls that collect large data sets (via accumulators)
>   - Job submissions and operator deployments where the functions closures are 
> large (for example because it contains large pre-loaded data)
>   - Function restore in cases where restored state is larger than 
> checkpointed state (union state)
> I suggest to use the {{BlobManager}} to transfer large payload.
>   - On the sender side, oversized messages are stored under a transient blob 
> (which is deleted after first retrieval, or after a certain number of minutes)
>   - The sender sends a "pointer to blob message" instead.
>   - The receiver grabs the message from the blob upon receiving the pointer 
> message
> The RPC Service should be optionally initializable with a "large message 
> handler" which is internally the {{BlobManager}}.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-4399) Add support for oversized messages

2019-09-04 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-4399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16922307#comment-16922307
 ] 

Biao Liu commented on FLINK-4399:
-

Ah, yes, I mean "it's not a critical issue".

> Add support for oversized messages
> --
>
> Key: FLINK-4399
> URL: https://issues.apache.org/jira/browse/FLINK-4399
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
> Environment: FLIP-6 feature branch
>Reporter: Stephan Ewen
>Assignee: Biao Liu
>Priority: Major
>  Labels: flip-6
>
> Currently, messages larger than the maximum Akka Framesize cause an error 
> when being transported. We should add a way to pass messages that are larger 
> than the Framesize, as may happen for:
>   - {{collect()}} calls that collect large data sets (via accumulators)
>   - Job submissions and operator deployments where the functions closures are 
> large (for example because it contains large pre-loaded data)
>   - Function restore in cases where restored state is larger than 
> checkpointed state (union state)
> I suggest to use the {{BlobManager}} to transfer large payload.
>   - On the sender side, oversized messages are stored under a transient blob 
> (which is deleted after first retrieval, or after a certain number of minutes)
>   - The sender sends a "pointer to blob message" instead.
>   - The receiver grabs the message from the blob upon receiving the pointer 
> message
> The RPC Service should be optionally initializable with a "large message 
> handler" which is internally the {{BlobManager}}.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-13698) Rework threading model of CheckpointCoordinator

2019-09-06 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16924472#comment-16924472
 ] 

Biao Liu commented on FLINK-13698:
--

Just a progress updating, now I'm doing some POC developments. I found it's a 
bit hard to separate the issue and work into subtasks. Because the changes of 
subtasks are tightly coupled. So I stopped creating new subtasks to avoid 
adjusting them during the further developing.

> Rework threading model of CheckpointCoordinator
> ---
>
> Key: FLINK-13698
> URL: https://issues.apache.org/jira/browse/FLINK-13698
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.10.0
>Reporter: Piotr Nowojski
>Assignee: Biao Liu
>Priority: Critical
>
> Currently {{CheckpointCoordinator}} and {{CheckpointFailureManager}} code is 
> executed by multiple different threads (mostly {{ioExecutor}}, but not only). 
> It's causing multiple concurrency issues, for example: 
> https://issues.apache.org/jira/browse/FLINK-13497
> Proper fix would be to rethink threading model there. At first glance it 
> doesn't seem that this code should be multi threaded, except of parts doing 
> the actual IO operations, so it should be possible to run everything in one 
> single ExecutionGraph's thread and just run asynchronously necessary IO 
> operations with some feedback loop ("mailbox style").
> I would strongly recommend fixing this issue before adding new features in 
> the \{{CheckpointCoordinator}} component.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Closed] (FLINK-13635) Unexpectedly interrupted in AsyncFunction#timeout

2019-09-20 Thread Biao Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu closed FLINK-13635.

Resolution: Won't Fix

This scenario should not exist anymore after integrating AsyncWaitOperator with 
mailbox threading model. [~arvid.he...@gmail.com]

> Unexpectedly interrupted in AsyncFunction#timeout
> -
>
> Key: FLINK-13635
> URL: https://issues.apache.org/jira/browse/FLINK-13635
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Affects Versions: 1.9.0
>Reporter: Biao Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Currently the way of handling {{AsyncFunction#timeout}} is a bit weird in 
> {{AsyncWaitOperator#processElement}}.
>  
> There are two methods in {{AsyncFunction}}, {{asyncInvoke}} and {{timeout}}. 
> The {{asyncInvoke}} is executed in task thread, while the {{timeout}} is 
> executed in system time service. When the {{asyncInvoke}} finished, it might 
> complete the {{ResultFuture}}. Then it cancels the registered timer of 
> {{timeout}}. However there is no any synchronization between the 
> {{asyncInvoke}}, {{timeout}} and the cancelation. Moreover this cancelation 
> is with interruption enabled.
> The {{timeout}} must be implemented very carefully. Because when the 
> {{timeout}} is executing, there might be an interruption triggered at the 
> same time (due to a completion of {{ResultFuture}}). That means the 
> {{timeout}} must handle {{InterruptedException}} well everywhere if there is 
> any operation reacting with this exception.
> My proposals are described below.
> 1. It should be written down in document that the {{asyncInvoke}} and 
> {{timeout}} might be invoked at the same time.
> 2. This interruption of {{timeout}} should be avoided. There should be a 
> synchronization between cancelation and {{timeout}}. If the {{timeout}} is 
> executing, the cancelation should be avoided. If the cancelation has been 
> invoked, this {{timeout}} should not be invoked anymore. Or we could simply 
> cancel the timer without an interruption.
> CC [~kkl0u], [~till.rohrmann]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15503) FileUploadHandlerTest.testMixedMultipart and FileUploadHandlerTest. testUploadCleanupOnUnknownAttribute failed on Azure

2020-01-08 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17010674#comment-17010674
 ] 

Biao Liu commented on FLINK-15503:
--

This case costed over 100 seconds. It's incredibly slow. Maybe other tests in 
the same case are just lucky enough to pass?


 BTW, we can't get full log (through transfer.sh) under Azure environment 
currently? I found a relevant discussion [1] in mailing list.

 

[1] 
[http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Migrate-build-infrastructure-from-Travis-CI-to-Azure-Pipelines-tt35538.html#a35641]

> FileUploadHandlerTest.testMixedMultipart and FileUploadHandlerTest. 
> testUploadCleanupOnUnknownAttribute failed on Azure
> ---
>
> Key: FLINK-15503
> URL: https://issues.apache.org/jira/browse/FLINK-15503
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / REST, Tests
>Affects Versions: 1.10.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.10.0
>
>
> The tests {{FileUploadHandlerTest.testMixedMultipart}} and 
> {{FileUploadHandlerTest. testUploadCleanupOnUnknownAttribute}} failed on 
> Azure with 
> {code}
> 2020-01-07T09:32:06.9840445Z [ERROR] 
> testUploadCleanupOnUnknownAttribute(org.apache.flink.runtime.rest.FileUploadHandlerTest)
>   Time elapsed: 12.457 s  <<< ERROR!
> 2020-01-07T09:32:06.9850865Z java.net.SocketTimeoutException: timeout
> 2020-01-07T09:32:06.9851650Z  at 
> org.apache.flink.runtime.rest.FileUploadHandlerTest.testUploadCleanupOnUnknownAttribute(FileUploadHandlerTest.java:234)
> 2020-01-07T09:32:06.9852910Z Caused by: java.net.SocketException: Socket 
> closed
> 2020-01-07T09:32:06.9853465Z  at 
> org.apache.flink.runtime.rest.FileUploadHandlerTest.testUploadCleanupOnUnknownAttribute(FileUploadHandlerTest.java:234)
> 2020-01-07T09:32:06.9853855Z 
> 2020-01-07T09:32:06.9854362Z [ERROR] 
> testMixedMultipart(org.apache.flink.runtime.rest.FileUploadHandlerTest)  Time 
> elapsed: 10.091 s  <<< ERROR!
> 2020-01-07T09:32:06.9855125Z java.net.SocketTimeoutException: Read timed out
> 2020-01-07T09:32:06.9855652Z  at 
> org.apache.flink.runtime.rest.FileUploadHandlerTest.testMixedMultipart(FileUploadHandlerTest.java:154)
> 2020-01-07T09:32:06.9856034Z 
> {code}
> https://dev.azure.com/rmetzger/Flink/_build/results?buildId=4159&view=results



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15541) FlinkKinesisConsumerTest.testSourceSynchronization is unstable on travis.

2020-01-12 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17014083#comment-17014083
 ] 

Biao Liu commented on FLINK-15541:
--

This is an obvious bug of unstable test case. Please check the PR, it's easy to 
understand.

> FlinkKinesisConsumerTest.testSourceSynchronization is unstable on travis.
> -
>
> Key: FLINK-15541
> URL: https://issues.apache.org/jira/browse/FLINK-15541
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.10.0
>Reporter: Xintong Song
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.10.0
>
>
> [https://api.travis-ci.org/v3/job/634712405/log.txt]
> {code:java}
> 13:16:19.144 [ERROR] Tests run: 11, Failures: 1, Errors: 0, Skipped: 0, Time 
> elapsed: 4.338 s <<< FAILURE! - in 
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumerTest
> 13:16:19.144 [ERROR] 
> testSourceSynchronization(org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumerTest)
>   Time elapsed: 1.001 s  <<< FAILURE!
> java.lang.AssertionError: expected null, but was: expected>
>   at 
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumerTest.testSourceSynchronization(FlinkKinesisConsumerTest.java:1018)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16720) Maven gets stuck downloading artifacts on Azure

2020-03-23 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17064715#comment-17064715
 ] 

Biao Liu commented on FLINK-16720:
--

Another instance, 
https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6514&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=27d1d645-cbce-54e2-51c4-d8b45fe24607

> Maven gets stuck downloading artifacts on Azure
> ---
>
> Key: FLINK-16720
> URL: https://issues.apache.org/jira/browse/FLINK-16720
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / Azure Pipelines
>Affects Versions: 1.11.0
>Reporter: Robert Metzger
>Priority: Major
>
> Logs: 
> https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6509&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=27d1d645-cbce-54e2-51c4-d8b45fe24607
> {code}
> 2020-03-23T08:43:28.4128014Z [INFO] 
> 
> 2020-03-23T08:43:28.4128557Z [INFO] Building flink-avro-confluent-registry 
> 1.11-SNAPSHOT
> 2020-03-23T08:43:28.4129129Z [INFO] 
> 
> 2020-03-23T08:48:47.6591333Z 
> ==
> 2020-03-23T08:48:47.6594540Z Maven produced no output for 300 seconds.
> 2020-03-23T08:48:47.6595164Z 
> ==
> 2020-03-23T08:48:47.6605370Z 
> ==
> 2020-03-23T08:48:47.6605803Z The following Java processes are running (JPS)
> 2020-03-23T08:48:47.6606173Z 
> ==
> 2020-03-23T08:48:47.7710037Z 920 Jps
> 2020-03-23T08:48:47.7778561Z 238 Launcher
> 2020-03-23T08:48:47.9270289Z 
> ==
> 2020-03-23T08:48:47.9270832Z Printing stack trace of Java process 967
> 2020-03-23T08:48:47.9271199Z 
> ==
> 2020-03-23T08:48:48.0165945Z 967: No such process
> 2020-03-23T08:48:48.0218260Z 
> ==
> 2020-03-23T08:48:48.0218736Z Printing stack trace of Java process 238
> 2020-03-23T08:48:48.0219075Z 
> ==
> 2020-03-23T08:48:48.3404066Z 2020-03-23 08:48:48
> 2020-03-23T08:48:48.3404828Z Full thread dump OpenJDK 64-Bit Server VM 
> (25.242-b08 mixed mode):
> 2020-03-23T08:48:48.3405064Z 
> 2020-03-23T08:48:48.3405445Z "Attach Listener" #370 daemon prio=9 os_prio=0 
> tid=0x7fe130001000 nid=0x452 waiting on condition [0x]
> 2020-03-23T08:48:48.3405868Zjava.lang.Thread.State: RUNNABLE
> 2020-03-23T08:48:48.3411202Z 
> 2020-03-23T08:48:48.3413171Z "resolver-5" #105 daemon prio=5 os_prio=0 
> tid=0x7fe1ec2ad800 nid=0x177 waiting on condition [0x7fe1872d9000]
> 2020-03-23T08:48:48.3414175Zjava.lang.Thread.State: WAITING (parking)
> 2020-03-23T08:48:48.3414560Z  at sun.misc.Unsafe.park(Native Method)
> 2020-03-23T08:48:48.3415451Z  - parking to wait for  <0x0003d5a9f828> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> 2020-03-23T08:48:48.3416180Z  at 
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> 2020-03-23T08:48:48.3416825Z  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> 2020-03-23T08:48:48.3417602Z  at 
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
> 2020-03-23T08:48:48.3418250Z  at 
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
> 2020-03-23T08:48:48.3418930Z  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
> 2020-03-23T08:48:48.3419900Z  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 2020-03-23T08:48:48.3420395Z  at java.lang.Thread.run(Thread.java:748)
> 2020-03-23T08:48:48.3420648Z 
> 2020-03-23T08:48:48.3421424Z "resolver-4" #104 daemon prio=5 os_prio=0 
> tid=0x7fe1ec2ad000 nid=0x176 waiting on condition [0x7fe1863dd000]
> 2020-03-23T08:48:48.3421914Zjava.lang.Thread.State: WAITING (parking)
> 2020-03-23T08:48:48.3422233Z  at sun.misc.Unsafe.park(Native Method)
> 2020-03-23T08:48:48.3422919Z  - parking to wait for  <0x0003d5a9f828> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> 2020-03-23T08:48:48.3423447Z  at 
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> 2020-03-23T08:48:48.3424141Z  at 
> java.util.concurrent

[jira] [Updated] (FLINK-13848) Support “scheduleAtFixedRate/scheduleAtFixedDelay” in RpcEndpoint#MainThreadExecutor

2020-03-23 Thread Biao Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-13848:
-
Parent: (was: FLINK-13698)
Issue Type: Improvement  (was: Sub-task)

> Support “scheduleAtFixedRate/scheduleAtFixedDelay” in 
> RpcEndpoint#MainThreadExecutor
> 
>
> Key: FLINK-13848
> URL: https://issues.apache.org/jira/browse/FLINK-13848
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently the methods “scheduleAtFixedRate/scheduleAtFixedDelay" of 
> {{RpcEndpoint#MainThreadExecutor}} are not implemented. Because there was no 
> requirement on them before.
> Now we are planning to implement these methods to support periodic checkpoint 
> triggering.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-13848) Support “scheduleAtFixedRate/scheduleAtFixedDelay” in RpcEndpoint#MainThreadExecutor

2020-03-23 Thread Biao Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-13848:
-
Fix Version/s: (was: 1.11.0)

> Support “scheduleAtFixedRate/scheduleAtFixedDelay” in 
> RpcEndpoint#MainThreadExecutor
> 
>
> Key: FLINK-13848
> URL: https://issues.apache.org/jira/browse/FLINK-13848
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently the methods “scheduleAtFixedRate/scheduleAtFixedDelay" of 
> {{RpcEndpoint#MainThreadExecutor}} are not implemented. Because there was no 
> requirement on them before.
> Now we are planning to implement these methods to support periodic checkpoint 
> triggering.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-13848) Support “scheduleAtFixedRate/scheduleAtFixedDelay” in RpcEndpoint#MainThreadExecutor

2020-03-23 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17064761#comment-17064761
 ] 

Biao Liu commented on FLINK-13848:
--

Just an update, we have a work-around solution instead which keeps the timer 
for periodic trigger of {{CheckpointCoordinator}} in FLINK-14971. So this 
ticket is unnecessary for FLINK-13698. Maybe someday we would come back to this 
issue if this feature is needed.

> Support “scheduleAtFixedRate/scheduleAtFixedDelay” in 
> RpcEndpoint#MainThreadExecutor
> 
>
> Key: FLINK-13848
> URL: https://issues.apache.org/jira/browse/FLINK-13848
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently the methods “scheduleAtFixedRate/scheduleAtFixedDelay" of 
> {{RpcEndpoint#MainThreadExecutor}} are not implemented. Because there was no 
> requirement on them before.
> Now we are planning to implement these methods to support periodic checkpoint 
> triggering.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-9741) Register JVM metrics for each JM separately

2020-03-30 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-9741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17070980#comment-17070980
 ] 

Biao Liu commented on FLINK-9741:
-

Oops, sorry, I forgot this issue. I gave it a lower priority.

Anyway, we could discuss a bit here.
First, I think we'd better keep the metrics under {{JobManager}} due to 
compatibility. [~trohrmann], [~chesnay], do you guys think it's acceptable that 
forking the JVM metrics from {{JobManager}} to {{RM}}, {{Dispatcher}} or even 
{{JobMaster}} ? We could do some optimization that they could share the same 
metric instance. However it might still annoy user there are so many duplicated 
metrics under different metric group, especially reported by {{MetricReporter}}.

> Register JVM metrics for each JM separately
> ---
>
> Key: FLINK-9741
> URL: https://issues.apache.org/jira/browse/FLINK-9741
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination, Runtime / Metrics
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Biao Liu
>Priority: Major
>
> Currently, the {{Dispatcher}} contains a {{JobManagerMetricGroup}} on which 
> the JVM metrics are registered. the JobManagers only receive a 
> {{JobManagerJobMetricGroup}} and don't register the JVM metrics.
> As the dispatcher and jobmanagers currently run in the same jvm, neither 
> exposing their IDs to the metric system, this doesn't cause problem _right 
> now_ as we can't differentiate between them anyway, but it will bite us down 
> the line if either of the above assumptions is broken.
> For example, with the proposed exposure of JM/Dispatcher IDs in FLINK-9543 we 
> would not expose JVM metrics tied to a JM, but only the Dispatcher.
> I propose to register all JVM metrics for each jobmanager separately to 
> future proof the whole thing.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file

2020-04-01 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17072553#comment-17072553
 ] 

Biao Liu commented on FLINK-16770:
--

Hi [~yunta], thanks for the analysis. I have a question that if chk-8 is 
dropped when cancelling the job, the chk-7 would not be subsumed since the 
finalization of chk-8 would not finish after adding to checkpoint store 
asynchronously. It would check the discarding state before doing subsuming. 

Although I haven't check the testing case carefully, I guess this might be 
relevant with FLINK-14971 which make the threading model here asynchronous. 
There is a small possibility that a checkpoint is discarded but it could be 
added into checkpoint store successfully. Because currently the cancellation 
and the manipulation on checkpoint store are in different threads. There is no 
a big lock for everything as before. Do you think it could cause this failure?

> Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end 
> test fails with no such file
> ---
>
> Key: FLINK-16770
> URL: https://issues.apache.org/jira/browse/FLINK-16770
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Tests
>Affects Versions: 1.11.0
>Reporter: Zhijiang
>Assignee: Yun Tang
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.11.0
>
> Attachments: e2e-output.log, 
> flink-vsts-standalonesession-0-fv-az53.log
>
>
> The log : 
> [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6603&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5]
>  
> There was also the similar problem in 
> https://issues.apache.org/jira/browse/FLINK-16561, but for the case of no 
> parallelism change. And this case is for scaling up. Not quite sure whether 
> the root cause is the same one.
> {code:java}
> 2020-03-25T06:50:31.3894841Z Running 'Resuming Externalized Checkpoint 
> (rocks, incremental, scale up) end-to-end test'
> 2020-03-25T06:50:31.3895308Z 
> ==
> 2020-03-25T06:50:31.3907274Z TEST_DATA_DIR: 
> /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304
> 2020-03-25T06:50:31.5500274Z Flink dist directory: 
> /home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT
> 2020-03-25T06:50:31.6354639Z Starting cluster.
> 2020-03-25T06:50:31.8871932Z Starting standalonesession daemon on host 
> fv-az655.
> 2020-03-25T06:50:33.5021784Z Starting taskexecutor daemon on host fv-az655.
> 2020-03-25T06:50:33.5152274Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:34.5498116Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:35.6031346Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:36.9848425Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:38.0283377Z Dispatcher REST endpoint is up.
> 2020-03-25T06:50:38.0285490Z Running externalized checkpoints test, with 
> ORIGINAL_DOP=2 NEW_DOP=4 and STATE_BACKEND_TYPE=rocks 
> STATE_BACKEND_FILE_ASYNC=true STATE_BACKEND_ROCKSDB_INCREMENTAL=true 
> SIMULATE_FAILURE=false ...
> 2020-03-25T06:50:46.1754645Z Job (b8cb04e4b1e730585bc616aa352866d0) is 
> running.
> 2020-03-25T06:50:46.1758132Z Waiting for job 
> (b8cb04e4b1e730585bc616aa352866d0) to have at least 1 completed checkpoints 
> ...
> 2020-03-25T06:50:46.3478276Z Waiting for job to process up to 200 records, 
> current progress: 173 records ...
> 2020-03-25T06:50:49.6332988Z Cancelling job b8cb04e4b1e730585bc616aa352866d0.
> 2020-03-25T06:50:50.4875673Z Cancelled job b8cb04e4b1e730585bc616aa352866d0.
> 2020-03-25T06:50:50.5468230Z ls: cannot access 
> '/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304/externalized-chckpt-e2e-backend-dir/b8cb04e4b1e730585bc616aa352866d0/chk-[1-9]*/_metadata':
>  No such file or directory
> 2020-03-25T06:50:50.5606260Z Restoring job with externalized checkpoint at . 
> ...
> 2020-03-25T06:50:58.4728245Z 
> 2020-03-25T06:50:58.4732663Z 
> 
> 2020-03-25T06:50:58.4735785Z  The program finished with the following 
> exception:
> 2020-03-25T06:50:58.4737759Z 
> 2020-03-25T06:50:58.4742666Z 
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
> JobGraph.
> 2020-03-25T06:50:58.4746274Z  at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> 2020-03-2

[jira] [Commented] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file

2020-04-01 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17073388#comment-17073388
 ] 

Biao Liu commented on FLINK-16770:
--

Hi [~yunta], thanks for the response. If I understand correctly, there is an 
inconsistent state of {{CompletedCheckpointStore}} while stopping a checkpoint 
which is doing asynchronous finalization.

There are two strategy here,
1. The checkpoint which is doing finalization could be aborted when 
{{CheckpointCoordinator}} is being shut down or periodic scheduler is being 
stopped. This is the choice of current implementation. However we didn't handle 
the {{CompletedCheckpointStore}} well. For example it might be better that 
reverting the state of {{CompletedCheckpointStore}} when the 
{{PendingCheckpoint}} finds the discarding after asynchronous finalization. But 
I think it's not easy to do so. Because there might be a subsuming operation 
during {{CompletedCheckpointStore#addCheckpoint}}.
2. The checkpoint which is doing finalization could NOT be aborted when 
{{CheckpointCoordinator}} is being shut down or period scheduler is being 
stopped. I personally prefer this solution, because it could simply the 
concurrent conflict scenario and it's much easier to implement. I think 
introducing an atomic boolean might not be enough. It's better to rethink the 
relationship between {{PendingCheckpoint#abort}} and 
{{PendingCheckpoint#finalizeCheckpoint}}. And we also need to rewrite a part of 
error handling of the finalization. 

BTW, [~yunta] could you share the unit test case which could reproduce the 
scenario locally? I want to verify my suggestion and solution. The original e2e 
test case is not stable.

> Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end 
> test fails with no such file
> ---
>
> Key: FLINK-16770
> URL: https://issues.apache.org/jira/browse/FLINK-16770
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Tests
>Affects Versions: 1.11.0
>Reporter: Zhijiang
>Assignee: Yun Tang
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.11.0
>
> Attachments: e2e-output.log, 
> flink-vsts-standalonesession-0-fv-az53.log
>
>
> The log : 
> [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6603&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5]
>  
> There was also the similar problem in 
> https://issues.apache.org/jira/browse/FLINK-16561, but for the case of no 
> parallelism change. And this case is for scaling up. Not quite sure whether 
> the root cause is the same one.
> {code:java}
> 2020-03-25T06:50:31.3894841Z Running 'Resuming Externalized Checkpoint 
> (rocks, incremental, scale up) end-to-end test'
> 2020-03-25T06:50:31.3895308Z 
> ==
> 2020-03-25T06:50:31.3907274Z TEST_DATA_DIR: 
> /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304
> 2020-03-25T06:50:31.5500274Z Flink dist directory: 
> /home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT
> 2020-03-25T06:50:31.6354639Z Starting cluster.
> 2020-03-25T06:50:31.8871932Z Starting standalonesession daemon on host 
> fv-az655.
> 2020-03-25T06:50:33.5021784Z Starting taskexecutor daemon on host fv-az655.
> 2020-03-25T06:50:33.5152274Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:34.5498116Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:35.6031346Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:36.9848425Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:38.0283377Z Dispatcher REST endpoint is up.
> 2020-03-25T06:50:38.0285490Z Running externalized checkpoints test, with 
> ORIGINAL_DOP=2 NEW_DOP=4 and STATE_BACKEND_TYPE=rocks 
> STATE_BACKEND_FILE_ASYNC=true STATE_BACKEND_ROCKSDB_INCREMENTAL=true 
> SIMULATE_FAILURE=false ...
> 2020-03-25T06:50:46.1754645Z Job (b8cb04e4b1e730585bc616aa352866d0) is 
> running.
> 2020-03-25T06:50:46.1758132Z Waiting for job 
> (b8cb04e4b1e730585bc616aa352866d0) to have at least 1 completed checkpoints 
> ...
> 2020-03-25T06:50:46.3478276Z Waiting for job to process up to 200 records, 
> current progress: 173 records ...
> 2020-03-25T06:50:49.6332988Z Cancelling job b8cb04e4b1e730585bc616aa352866d0.
> 2020-03-25T06:50:50.4875673Z Cancelled job b8cb04e4b1e730585bc616aa352866d0.
> 2020-03-25T06:50:50.5468230Z ls: cannot access 
> '/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304/externalized-chckpt-e2e-backend-dir/b8cb04e4b1e730585bc616aa352866d0/chk-[1-9]*/_meta

[jira] [Comment Edited] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file

2020-04-01 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17073388#comment-17073388
 ] 

Biao Liu edited comment on FLINK-16770 at 4/2/20, 5:52 AM:
---

Hi [~yunta], thanks for the response. If I understand correctly, there is an 
inconsistent state of {{CompletedCheckpointStore}} while stopping a checkpoint 
which is doing asynchronous finalization.

There are two strategies here,
 1. The checkpoint which is doing finalization could be aborted when 
{{CheckpointCoordinator}} is being shut down or periodic scheduler is being 
stopped. This is the choice of current implementation. However we didn't handle 
the {{CompletedCheckpointStore}} well. For example it might be better that 
reverting the state of {{CompletedCheckpointStore}} when the 
{{PendingCheckpoint}} finds the discarding after asynchronous finalization. But 
I think it's not easy to do so. Because there might be a subsuming operation 
during {{CompletedCheckpointStore#addCheckpoint}}.
 2. The checkpoint which is doing finalization could NOT be aborted when 
{{CheckpointCoordinator}} is being shut down or period scheduler is being 
stopped. I personally prefer this solution, because it could simply the 
concurrent conflict scenario and it's much easier to implement. I think 
introducing an atomic boolean might not be enough. It's better to rethink the 
relationship between {{PendingCheckpoint#abort}} and 
{{PendingCheckpoint#finalizeCheckpoint}}. And we also need to rewrite a part of 
error handling of the finalization.

BTW, [~yunta] could you share the unit test case which could reproduce the 
scenario locally? I want to verify my assumption and solution. The original e2e 
test case is not stable.


was (Author: sleepy):
Hi [~yunta], thanks for the response. If I understand correctly, there is an 
inconsistent state of {{CompletedCheckpointStore}} while stopping a checkpoint 
which is doing asynchronous finalization.

There are two strategy here,
1. The checkpoint which is doing finalization could be aborted when 
{{CheckpointCoordinator}} is being shut down or periodic scheduler is being 
stopped. This is the choice of current implementation. However we didn't handle 
the {{CompletedCheckpointStore}} well. For example it might be better that 
reverting the state of {{CompletedCheckpointStore}} when the 
{{PendingCheckpoint}} finds the discarding after asynchronous finalization. But 
I think it's not easy to do so. Because there might be a subsuming operation 
during {{CompletedCheckpointStore#addCheckpoint}}.
2. The checkpoint which is doing finalization could NOT be aborted when 
{{CheckpointCoordinator}} is being shut down or period scheduler is being 
stopped. I personally prefer this solution, because it could simply the 
concurrent conflict scenario and it's much easier to implement. I think 
introducing an atomic boolean might not be enough. It's better to rethink the 
relationship between {{PendingCheckpoint#abort}} and 
{{PendingCheckpoint#finalizeCheckpoint}}. And we also need to rewrite a part of 
error handling of the finalization. 

BTW, [~yunta] could you share the unit test case which could reproduce the 
scenario locally? I want to verify my suggestion and solution. The original e2e 
test case is not stable.

> Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end 
> test fails with no such file
> ---
>
> Key: FLINK-16770
> URL: https://issues.apache.org/jira/browse/FLINK-16770
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Tests
>Affects Versions: 1.11.0
>Reporter: Zhijiang
>Assignee: Yun Tang
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.11.0
>
> Attachments: e2e-output.log, 
> flink-vsts-standalonesession-0-fv-az53.log
>
>
> The log : 
> [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6603&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5]
>  
> There was also the similar problem in 
> https://issues.apache.org/jira/browse/FLINK-16561, but for the case of no 
> parallelism change. And this case is for scaling up. Not quite sure whether 
> the root cause is the same one.
> {code:java}
> 2020-03-25T06:50:31.3894841Z Running 'Resuming Externalized Checkpoint 
> (rocks, incremental, scale up) end-to-end test'
> 2020-03-25T06:50:31.3895308Z 
> ==
> 2020-03-25T06:50:31.3907274Z TEST_DATA_DIR: 
> /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304
> 2020-03-25T06:50:31.5500274Z Flink dist directory: 
> /home/vsts/work/1/s/flink-dist/target/fl

[jira] [Closed] (FLINK-16561) Resuming Externalized Checkpoint (rocks, incremental, no parallelism change) end-to-end test fails on Azure

2020-04-02 Thread Biao Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu closed FLINK-16561.

Resolution: Duplicate

> Resuming Externalized Checkpoint (rocks, incremental, no parallelism change) 
> end-to-end test fails on Azure
> ---
>
> Key: FLINK-16561
> URL: https://issues.apache.org/jira/browse/FLINK-16561
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Tests
>Affects Versions: 1.11.0
>Reporter: Biao Liu
>Assignee: Yun Tang
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.11.0
>
>
> {quote}Caused by: java.io.IOException: Cannot access file system for 
> checkpoint/savepoint path 'file://.'.
>   at 
> org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpointPointer(AbstractFsCheckpointStorage.java:233)
>   at 
> org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpoint(AbstractFsCheckpointStorage.java:110)
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1332)
>   at 
> org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:314)
>   at 
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:247)
>   at 
> org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:223)
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:118)
>   at 
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:281)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:269)
>   at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
>   at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
>   at 
> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:146)
>   ... 10 more
> Caused by: java.io.IOException: Found local file path with authority '.' in 
> path 'file://.'. Hint: Did you forget a slash? (correct path would be 
> 'file:///.')
>   at 
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:441)
>   at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)
>   at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
>   at 
> org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpointPointer(AbstractFsCheckpointStorage.java:230)
>   ... 22 more
> {quote}
> The original log is here, 
> https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6073&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=2b7514ee-e706-5046-657b-3430666e7bd9
> There are some similar tickets about this case, but the stack here looks 
> different. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16561) Resuming Externalized Checkpoint (rocks, incremental, no parallelism change) end-to-end test fails on Azure

2020-04-02 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17073441#comment-17073441
 ] 

Biao Liu commented on FLINK-16561:
--

[~rmetzger], I have closed this ticket, let's discuss under FLINK-16770.

> Resuming Externalized Checkpoint (rocks, incremental, no parallelism change) 
> end-to-end test fails on Azure
> ---
>
> Key: FLINK-16561
> URL: https://issues.apache.org/jira/browse/FLINK-16561
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Tests
>Affects Versions: 1.11.0
>Reporter: Biao Liu
>Assignee: Yun Tang
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.11.0
>
>
> {quote}Caused by: java.io.IOException: Cannot access file system for 
> checkpoint/savepoint path 'file://.'.
>   at 
> org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpointPointer(AbstractFsCheckpointStorage.java:233)
>   at 
> org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpoint(AbstractFsCheckpointStorage.java:110)
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1332)
>   at 
> org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:314)
>   at 
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:247)
>   at 
> org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:223)
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:118)
>   at 
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:281)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:269)
>   at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
>   at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
>   at 
> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:146)
>   ... 10 more
> Caused by: java.io.IOException: Found local file path with authority '.' in 
> path 'file://.'. Hint: Did you forget a slash? (correct path would be 
> 'file:///.')
>   at 
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:441)
>   at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)
>   at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
>   at 
> org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpointPointer(AbstractFsCheckpointStorage.java:230)
>   ... 22 more
> {quote}
> The original log is here, 
> https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6073&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=2b7514ee-e706-5046-657b-3430666e7bd9
> There are some similar tickets about this case, but the stack here looks 
> different. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16945) Execute CheckpointFailureManager.FailJobCallback directly in main thread executor

2020-04-02 Thread Biao Liu (Jira)
Biao Liu created FLINK-16945:


 Summary: Execute CheckpointFailureManager.FailJobCallback directly 
in main thread executor
 Key: FLINK-16945
 URL: https://issues.apache.org/jira/browse/FLINK-16945
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Affects Versions: 1.10.0
Reporter: Biao Liu
 Fix For: 1.11.0


Since we have put all non-IO operations of {{CheckpointCoordinator}} into main 
thread executor, the {{CheckpointFailureManager.FailJobCallback}} could be 
executed directly now. In this way execution graph would fail immediately when 
{{CheckpointFailureManager}} invokes the callback. We could avoid the 
inconsistent scenario of FLINK-13497.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16931) Large _metadata file lead to JobManager not responding when restart

2020-04-02 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17073927#comment-17073927
 ] 

Biao Liu commented on FLINK-16931:
--

[~trohrmann], my pleasure :)

I could share some context here. We have discussed this scenario refactoring 
the whole threading model of {{CheckpointCoordinator}}, see FLINK-13497 and 
FLINK-13698. Although this scenario is not the cause of FLINK-13497, we think 
there is risk of heartbeat timeout. At that time, we decided to treat it as a 
follow-up issue. However we haven't file any ticket for it yet. 

After FLINK-13698, most parts of the non-IO operations of 
{{CheckpointCoordinator}} are executed in main thread executor, except the 
initialization part which causes this problem. One of the final targets is 
putting all IO operations of {{CheckpointCoordinator}} into IO thread executor, 
others are executed in main thread executor. To achieve this, some synchronous 
operations must be refactored into asynchronous ways. I think that's what we 
need to do here. 

> Large _metadata file lead to JobManager not responding when restart
> ---
>
> Key: FLINK-16931
> URL: https://issues.apache.org/jira/browse/FLINK-16931
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / Coordination
>Affects Versions: 1.9.2, 1.10.0, 1.11.0
>Reporter: Lu Niu
>Priority: Critical
> Fix For: 1.11.0
>
>
> When _metadata file is big, JobManager could never recover from checkpoint. 
> It fall into a loop that fetch checkpoint -> JM timeout -> restart. Here is 
> related log: 
> {code:java}
>  2020-04-01 17:08:25,689 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - 
> Recovering checkpoints from ZooKeeper.
>  2020-04-01 17:08:25,698 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Found 
> 3 checkpoints in ZooKeeper.
>  2020-04-01 17:08:25,698 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - 
> Trying to fetch 3 checkpoints from storage.
>  2020-04-01 17:08:25,698 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - 
> Trying to retrieve checkpoint 50.
>  2020-04-01 17:08:48,589 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - 
> Trying to retrieve checkpoint 51.
>  2020-04-01 17:09:12,775 INFO org.apache.flink.yarn.YarnResourceManager - The 
> heartbeat of JobManager with id 02500708baf0bb976891c391afd3d7d5 timed out.
> {code}
> Digging into the code, looks like ExecutionGraph::restart runs in JobMaster 
> main thread and finally calls 
> ZooKeeperCompletedCheckpointStore::retrieveCompletedCheckpoint which download 
> file form DFS. The main thread is basically blocked for a while because of 
> this. One possible solution is to making the downloading part async. More 
> things might need to consider as the original change tries to make it 
> single-threaded. [https://github.com/apache/flink/pull/7568]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-16931) Large _metadata file lead to JobManager not responding when restart

2020-04-02 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17073927#comment-17073927
 ] 

Biao Liu edited comment on FLINK-16931 at 4/2/20, 5:31 PM:
---

[~trohrmann], my pleasure :)

I could share some context here. We have discussed this scenario before 
refactoring the whole threading model of {{CheckpointCoordinator}}, see 
FLINK-13497 and FLINK-13698. Although this scenario is not the cause of 
FLINK-13497, we think there is a risk of heartbeat timeout. At that time, we 
decided to treat it as a follow-up issue. However we haven't file any ticket 
for it yet.

After FLINK-13698, most parts of the non-IO operations of 
{{CheckpointCoordinator}} are executed in main thread executor, except the 
initialization part which causes this problem. One of the final targets is 
putting all IO operations of {{CheckpointCoordinator}} into IO thread executor, 
others are executed in main thread executor. To achieve this, some synchronous 
operations must be refactored into asynchronous ways. I think that's what we 
need to do here.


was (Author: sleepy):
[~trohrmann], my pleasure :)

I could share some context here. We have discussed this scenario before 
refactoring the whole threading model of {{CheckpointCoordinator}}, see 
FLINK-13497 and FLINK-13698. Although this scenario is not the cause of 
FLINK-13497, we think there is risk of heartbeat timeout. At that time, we 
decided to treat it as a follow-up issue. However we haven't file any ticket 
for it yet.

After FLINK-13698, most parts of the non-IO operations of 
{{CheckpointCoordinator}} are executed in main thread executor, except the 
initialization part which causes this problem. One of the final targets is 
putting all IO operations of {{CheckpointCoordinator}} into IO thread executor, 
others are executed in main thread executor. To achieve this, some synchronous 
operations must be refactored into asynchronous ways. I think that's what we 
need to do here.

> Large _metadata file lead to JobManager not responding when restart
> ---
>
> Key: FLINK-16931
> URL: https://issues.apache.org/jira/browse/FLINK-16931
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / Coordination
>Affects Versions: 1.9.2, 1.10.0, 1.11.0
>Reporter: Lu Niu
>Priority: Critical
> Fix For: 1.11.0
>
>
> When _metadata file is big, JobManager could never recover from checkpoint. 
> It fall into a loop that fetch checkpoint -> JM timeout -> restart. Here is 
> related log: 
> {code:java}
>  2020-04-01 17:08:25,689 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - 
> Recovering checkpoints from ZooKeeper.
>  2020-04-01 17:08:25,698 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Found 
> 3 checkpoints in ZooKeeper.
>  2020-04-01 17:08:25,698 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - 
> Trying to fetch 3 checkpoints from storage.
>  2020-04-01 17:08:25,698 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - 
> Trying to retrieve checkpoint 50.
>  2020-04-01 17:08:48,589 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - 
> Trying to retrieve checkpoint 51.
>  2020-04-01 17:09:12,775 INFO org.apache.flink.yarn.YarnResourceManager - The 
> heartbeat of JobManager with id 02500708baf0bb976891c391afd3d7d5 timed out.
> {code}
> Digging into the code, looks like ExecutionGraph::restart runs in JobMaster 
> main thread and finally calls 
> ZooKeeperCompletedCheckpointStore::retrieveCompletedCheckpoint which download 
> file form DFS. The main thread is basically blocked for a while because of 
> this. One possible solution is to making the downloading part async. More 
> things might need to consider as the original change tries to make it 
> single-threaded. [https://github.com/apache/flink/pull/7568]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-16931) Large _metadata file lead to JobManager not responding when restart

2020-04-02 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17073927#comment-17073927
 ] 

Biao Liu edited comment on FLINK-16931 at 4/2/20, 5:31 PM:
---

[~trohrmann], my pleasure :)

I could share some context here. We have discussed this scenario before 
refactoring the whole threading model of {{CheckpointCoordinator}}, see 
FLINK-13497 and FLINK-13698. Although this scenario is not the cause of 
FLINK-13497, we think there is risk of heartbeat timeout. At that time, we 
decided to treat it as a follow-up issue. However we haven't file any ticket 
for it yet.

After FLINK-13698, most parts of the non-IO operations of 
{{CheckpointCoordinator}} are executed in main thread executor, except the 
initialization part which causes this problem. One of the final targets is 
putting all IO operations of {{CheckpointCoordinator}} into IO thread executor, 
others are executed in main thread executor. To achieve this, some synchronous 
operations must be refactored into asynchronous ways. I think that's what we 
need to do here.


was (Author: sleepy):
[~trohrmann], my pleasure :)

I could share some context here. We have discussed this scenario refactoring 
the whole threading model of {{CheckpointCoordinator}}, see FLINK-13497 and 
FLINK-13698. Although this scenario is not the cause of FLINK-13497, we think 
there is risk of heartbeat timeout. At that time, we decided to treat it as a 
follow-up issue. However we haven't file any ticket for it yet. 

After FLINK-13698, most parts of the non-IO operations of 
{{CheckpointCoordinator}} are executed in main thread executor, except the 
initialization part which causes this problem. One of the final targets is 
putting all IO operations of {{CheckpointCoordinator}} into IO thread executor, 
others are executed in main thread executor. To achieve this, some synchronous 
operations must be refactored into asynchronous ways. I think that's what we 
need to do here. 

> Large _metadata file lead to JobManager not responding when restart
> ---
>
> Key: FLINK-16931
> URL: https://issues.apache.org/jira/browse/FLINK-16931
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / Coordination
>Affects Versions: 1.9.2, 1.10.0, 1.11.0
>Reporter: Lu Niu
>Priority: Critical
> Fix For: 1.11.0
>
>
> When _metadata file is big, JobManager could never recover from checkpoint. 
> It fall into a loop that fetch checkpoint -> JM timeout -> restart. Here is 
> related log: 
> {code:java}
>  2020-04-01 17:08:25,689 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - 
> Recovering checkpoints from ZooKeeper.
>  2020-04-01 17:08:25,698 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Found 
> 3 checkpoints in ZooKeeper.
>  2020-04-01 17:08:25,698 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - 
> Trying to fetch 3 checkpoints from storage.
>  2020-04-01 17:08:25,698 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - 
> Trying to retrieve checkpoint 50.
>  2020-04-01 17:08:48,589 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - 
> Trying to retrieve checkpoint 51.
>  2020-04-01 17:09:12,775 INFO org.apache.flink.yarn.YarnResourceManager - The 
> heartbeat of JobManager with id 02500708baf0bb976891c391afd3d7d5 timed out.
> {code}
> Digging into the code, looks like ExecutionGraph::restart runs in JobMaster 
> main thread and finally calls 
> ZooKeeperCompletedCheckpointStore::retrieveCompletedCheckpoint which download 
> file form DFS. The main thread is basically blocked for a while because of 
> this. One possible solution is to making the downloading part async. More 
> things might need to consider as the original change tries to make it 
> single-threaded. [https://github.com/apache/flink/pull/7568]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file

2020-04-03 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17074438#comment-17074438
 ] 

Biao Liu commented on FLINK-16770:
--

After a short discussion with [~yunta] offline, we reached agreement of the 
possible solution. [~yunta] will continue working on it.

Besides that, we think it's better to quickly fix the failed case first. So 
other guys could avoid suffering from this unstable failure. I have created a 
PR to try to resolve the failed case in a work-around way. [~yunta] could you 
take a look is there anything missing?

> Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end 
> test fails with no such file
> ---
>
> Key: FLINK-16770
> URL: https://issues.apache.org/jira/browse/FLINK-16770
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Tests
>Affects Versions: 1.11.0
>Reporter: Zhijiang
>Assignee: Yun Tang
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.11.0
>
> Attachments: e2e-output.log, 
> flink-vsts-standalonesession-0-fv-az53.log
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The log : 
> [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6603&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5]
>  
> There was also the similar problem in 
> https://issues.apache.org/jira/browse/FLINK-16561, but for the case of no 
> parallelism change. And this case is for scaling up. Not quite sure whether 
> the root cause is the same one.
> {code:java}
> 2020-03-25T06:50:31.3894841Z Running 'Resuming Externalized Checkpoint 
> (rocks, incremental, scale up) end-to-end test'
> 2020-03-25T06:50:31.3895308Z 
> ==
> 2020-03-25T06:50:31.3907274Z TEST_DATA_DIR: 
> /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304
> 2020-03-25T06:50:31.5500274Z Flink dist directory: 
> /home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT
> 2020-03-25T06:50:31.6354639Z Starting cluster.
> 2020-03-25T06:50:31.8871932Z Starting standalonesession daemon on host 
> fv-az655.
> 2020-03-25T06:50:33.5021784Z Starting taskexecutor daemon on host fv-az655.
> 2020-03-25T06:50:33.5152274Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:34.5498116Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:35.6031346Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:36.9848425Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:38.0283377Z Dispatcher REST endpoint is up.
> 2020-03-25T06:50:38.0285490Z Running externalized checkpoints test, with 
> ORIGINAL_DOP=2 NEW_DOP=4 and STATE_BACKEND_TYPE=rocks 
> STATE_BACKEND_FILE_ASYNC=true STATE_BACKEND_ROCKSDB_INCREMENTAL=true 
> SIMULATE_FAILURE=false ...
> 2020-03-25T06:50:46.1754645Z Job (b8cb04e4b1e730585bc616aa352866d0) is 
> running.
> 2020-03-25T06:50:46.1758132Z Waiting for job 
> (b8cb04e4b1e730585bc616aa352866d0) to have at least 1 completed checkpoints 
> ...
> 2020-03-25T06:50:46.3478276Z Waiting for job to process up to 200 records, 
> current progress: 173 records ...
> 2020-03-25T06:50:49.6332988Z Cancelling job b8cb04e4b1e730585bc616aa352866d0.
> 2020-03-25T06:50:50.4875673Z Cancelled job b8cb04e4b1e730585bc616aa352866d0.
> 2020-03-25T06:50:50.5468230Z ls: cannot access 
> '/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304/externalized-chckpt-e2e-backend-dir/b8cb04e4b1e730585bc616aa352866d0/chk-[1-9]*/_metadata':
>  No such file or directory
> 2020-03-25T06:50:50.5606260Z Restoring job with externalized checkpoint at . 
> ...
> 2020-03-25T06:50:58.4728245Z 
> 2020-03-25T06:50:58.4732663Z 
> 
> 2020-03-25T06:50:58.4735785Z  The program finished with the following 
> exception:
> 2020-03-25T06:50:58.4737759Z 
> 2020-03-25T06:50:58.4742666Z 
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
> JobGraph.
> 2020-03-25T06:50:58.4746274Z  at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> 2020-03-25T06:50:58.4749954Z  at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> 2020-03-25T06:50:58.4752753Z  at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:142)
> 2020-03-25T06:50:58.4755400Z  at 

[jira] [Commented] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file

2020-04-06 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17076161#comment-17076161
 ] 

Biao Liu commented on FLINK-16770:
--

Thanks [~rmetzger] for manually verifying and merging the PR. 

> Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end 
> test fails with no such file
> ---
>
> Key: FLINK-16770
> URL: https://issues.apache.org/jira/browse/FLINK-16770
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Tests
>Affects Versions: 1.11.0
>Reporter: Zhijiang
>Assignee: Yun Tang
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.11.0
>
> Attachments: e2e-output.log, 
> flink-vsts-standalonesession-0-fv-az53.log
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The log : 
> [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6603&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5]
>  
> There was also the similar problem in 
> https://issues.apache.org/jira/browse/FLINK-16561, but for the case of no 
> parallelism change. And this case is for scaling up. Not quite sure whether 
> the root cause is the same one.
> {code:java}
> 2020-03-25T06:50:31.3894841Z Running 'Resuming Externalized Checkpoint 
> (rocks, incremental, scale up) end-to-end test'
> 2020-03-25T06:50:31.3895308Z 
> ==
> 2020-03-25T06:50:31.3907274Z TEST_DATA_DIR: 
> /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304
> 2020-03-25T06:50:31.5500274Z Flink dist directory: 
> /home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT
> 2020-03-25T06:50:31.6354639Z Starting cluster.
> 2020-03-25T06:50:31.8871932Z Starting standalonesession daemon on host 
> fv-az655.
> 2020-03-25T06:50:33.5021784Z Starting taskexecutor daemon on host fv-az655.
> 2020-03-25T06:50:33.5152274Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:34.5498116Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:35.6031346Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:36.9848425Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:38.0283377Z Dispatcher REST endpoint is up.
> 2020-03-25T06:50:38.0285490Z Running externalized checkpoints test, with 
> ORIGINAL_DOP=2 NEW_DOP=4 and STATE_BACKEND_TYPE=rocks 
> STATE_BACKEND_FILE_ASYNC=true STATE_BACKEND_ROCKSDB_INCREMENTAL=true 
> SIMULATE_FAILURE=false ...
> 2020-03-25T06:50:46.1754645Z Job (b8cb04e4b1e730585bc616aa352866d0) is 
> running.
> 2020-03-25T06:50:46.1758132Z Waiting for job 
> (b8cb04e4b1e730585bc616aa352866d0) to have at least 1 completed checkpoints 
> ...
> 2020-03-25T06:50:46.3478276Z Waiting for job to process up to 200 records, 
> current progress: 173 records ...
> 2020-03-25T06:50:49.6332988Z Cancelling job b8cb04e4b1e730585bc616aa352866d0.
> 2020-03-25T06:50:50.4875673Z Cancelled job b8cb04e4b1e730585bc616aa352866d0.
> 2020-03-25T06:50:50.5468230Z ls: cannot access 
> '/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304/externalized-chckpt-e2e-backend-dir/b8cb04e4b1e730585bc616aa352866d0/chk-[1-9]*/_metadata':
>  No such file or directory
> 2020-03-25T06:50:50.5606260Z Restoring job with externalized checkpoint at . 
> ...
> 2020-03-25T06:50:58.4728245Z 
> 2020-03-25T06:50:58.4732663Z 
> 
> 2020-03-25T06:50:58.4735785Z  The program finished with the following 
> exception:
> 2020-03-25T06:50:58.4737759Z 
> 2020-03-25T06:50:58.4742666Z 
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
> JobGraph.
> 2020-03-25T06:50:58.4746274Z  at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> 2020-03-25T06:50:58.4749954Z  at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> 2020-03-25T06:50:58.4752753Z  at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:142)
> 2020-03-25T06:50:58.4755400Z  at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:659)
> 2020-03-25T06:50:58.4757862Z  at 
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
> 2020-03-25T06:50:58.4760282Z  at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:890)
> 2020-03-25T06:50:58.4763591Z  at 
> org.apach

[jira] [Commented] (FLINK-15611) KafkaITCase.testOneToOneSources fails on Travis

2020-01-16 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17017767#comment-17017767
 ] 

Biao Liu commented on FLINK-15611:
--

{quote}
...
Caused by: java.lang.Exception: Received a duplicate: 4924
at 
org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink.invoke(ValidatingExactlyOnceSink.java:57)
at 
org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink.invoke(ValidatingExactlyOnceSink.java:36)
at 
org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:170)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:702)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:527)
at java.lang.Thread.run(Thread.java:748)
{quote}

It looks like a serious issue. The exactly once semantics here seems to be 
broken.

> KafkaITCase.testOneToOneSources fails on Travis
> ---
>
> Key: FLINK-15611
> URL: https://issues.apache.org/jira/browse/FLINK-15611
> Project: Flink
>  Issue Type: Bug
>Reporter: Yangze Guo
>Priority: Critical
> Fix For: 1.10.0
>
>
> {{The test KafkaITCase.testOneToOneSources failed on Travis.}}
> {code:java}
> 03:15:02,019 INFO  
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl  - 
> Deleting topic scale-down-before-first-checkpoint
> 03:15:02,037 INFO  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase  - 
> 
> Test 
> testScaleDownBeforeFirstCheckpoint(org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase)
>  successfully run.
> 
> 03:15:02,038 INFO  org.apache.flink.streaming.connectors.kafka.KafkaTestBase  
>- -
> 03:15:02,038 INFO  org.apache.flink.streaming.connectors.kafka.KafkaTestBase  
>- Shut down KafkaTestBase 
> 03:15:02,038 INFO  org.apache.flink.streaming.connectors.kafka.KafkaTestBase  
>- -
> 03:15:25,728 INFO  org.apache.flink.streaming.connectors.kafka.KafkaTestBase  
>- -
> 03:15:25,728 INFO  org.apache.flink.streaming.connectors.kafka.KafkaTestBase  
>- KafkaTestBase finished
> 03:15:25,728 INFO  org.apache.flink.streaming.connectors.kafka.KafkaTestBase  
>- -
> 03:15:25.731 [INFO] Tests run: 12, Failures: 0, Errors: 0, Skipped: 0, Time 
> elapsed: 245.845 s - in 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase
> 03:15:26.099 [INFO] 
> 03:15:26.099 [INFO] Results:
> 03:15:26.099 [INFO] 
> 03:15:26.099 [ERROR] Failures: 
> 03:15:26.099 [ERROR]   
> KafkaITCase.testOneToOneSources:97->KafkaConsumerTestBase.runOneToOneExactlyOnceTest:862
>  Test failed: Job execution failed.
> {code}
> https://api.travis-ci.com/v3/job/276124537/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14742) Unstable tests TaskExecutorTest#testSubmitTaskBeforeAcceptSlot

2020-01-16 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14742?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17017779#comment-17017779
 ] 

Biao Liu commented on FLINK-14742:
--

Such a subtle case if it couldn't be reproduced :)
I have checked the test case, but didn't find the clue. Nice work [~kkl0u]!

> Unstable tests TaskExecutorTest#testSubmitTaskBeforeAcceptSlot
> --
>
> Key: FLINK-14742
> URL: https://issues.apache.org/jira/browse/FLINK-14742
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: Zili Chen
>Assignee: Kostas Kloudas
>Priority: Critical
> Fix For: 1.10.0
>
>
> deadlock.
> {code}
> "main" #1 prio=5 os_prio=0 tid=0x7f1f8800b800 nid=0x356 waiting on 
> condition [0x7f1f8e65c000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0x86e9e9c0> (a 
> java.util.concurrent.CompletableFuture$Signaller)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
>   at 
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>   at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutorTest.testSubmitTaskBeforeAcceptSlot(TaskExecutorTest.java:1108)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
> {code}
> full log https://api.travis-ci.org/v3/job/611275566/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15611) KafkaITCase.testOneToOneSources fails on Travis

2020-01-17 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17017789#comment-17017789
 ] 

Biao Liu commented on FLINK-15611:
--

[~jqin], could you take a look at this case?

> KafkaITCase.testOneToOneSources fails on Travis
> ---
>
> Key: FLINK-15611
> URL: https://issues.apache.org/jira/browse/FLINK-15611
> Project: Flink
>  Issue Type: Bug
>Reporter: Yangze Guo
>Priority: Critical
> Fix For: 1.10.0
>
>
> {{The test KafkaITCase.testOneToOneSources failed on Travis.}}
> {code:java}
> 03:15:02,019 INFO  
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl  - 
> Deleting topic scale-down-before-first-checkpoint
> 03:15:02,037 INFO  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase  - 
> 
> Test 
> testScaleDownBeforeFirstCheckpoint(org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase)
>  successfully run.
> 
> 03:15:02,038 INFO  org.apache.flink.streaming.connectors.kafka.KafkaTestBase  
>- -
> 03:15:02,038 INFO  org.apache.flink.streaming.connectors.kafka.KafkaTestBase  
>- Shut down KafkaTestBase 
> 03:15:02,038 INFO  org.apache.flink.streaming.connectors.kafka.KafkaTestBase  
>- -
> 03:15:25,728 INFO  org.apache.flink.streaming.connectors.kafka.KafkaTestBase  
>- -
> 03:15:25,728 INFO  org.apache.flink.streaming.connectors.kafka.KafkaTestBase  
>- KafkaTestBase finished
> 03:15:25,728 INFO  org.apache.flink.streaming.connectors.kafka.KafkaTestBase  
>- -
> 03:15:25.731 [INFO] Tests run: 12, Failures: 0, Errors: 0, Skipped: 0, Time 
> elapsed: 245.845 s - in 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase
> 03:15:26.099 [INFO] 
> 03:15:26.099 [INFO] Results:
> 03:15:26.099 [INFO] 
> 03:15:26.099 [ERROR] Failures: 
> 03:15:26.099 [ERROR]   
> KafkaITCase.testOneToOneSources:97->KafkaConsumerTestBase.runOneToOneExactlyOnceTest:862
>  Test failed: Job execution failed.
> {code}
> https://api.travis-ci.com/v3/job/276124537/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16423) test_ha_per_job_cluster_datastream.sh gets stuck

2020-04-08 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17078057#comment-17078057
 ] 

Biao Liu commented on FLINK-16423:
--

Thanks [~rmetzger] for analyzing so deeply. I checked the attached log. I 
believe the scenario is same with FLINK-16770. The problem happens when 
checkpoint 9 is doing finalization. We can see that the 
{{CheckpointCoordinator}} tried to recover from checkpoint 9. So the checkpoint 
9 must be added into {{CompletedCheckpointStore}}. However we can't find the 
log "Completed checkpoint 9 ...". It must failed after being added into 
{{CompletedCheckpointStore}}, like being aborted due to the "artificial 
failure". Regarding to "where is the checkpoint 6, 7, 8", since we only keep 1 
successful checkpoint in {{CompletedCheckpointStore}}, they must be subsumed 
when checkpoint 9 was adding into {{CompletedCheckpointStore}}. 

The work-around fixing so far of FLINK-16770 is that keeping 2 successful 
checkpoints in {{CompletedCheckpointStore}] for these cases. So even if 
checkpoint 9 doesn't finish the finalization, there should be at least 
checkpoint 8 existing.

If it gets stuck quite frequently, we could apply the work-around fixing for 
the case. However this bug has to be fixed completely before releasing 1.11.

> test_ha_per_job_cluster_datastream.sh gets stuck
> 
>
> Key: FLINK-16423
> URL: https://issues.apache.org/jira/browse/FLINK-16423
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Tests
>Affects Versions: 1.11.0
>Reporter: Robert Metzger
>Assignee: Robert Metzger
>Priority: Blocker
> Attachments: 20200408.1.tgz
>
>
> This was seen in 
> https://dev.azure.com/rmetzger/Flink/_build/results?buildId=5905&view=logs&j=b1623ac9-0979-5b0d-2e5e-1377d695c991&t=e7804547-1789-5225-2bcf-269eeaa37447
>  ... the relevant part of the logs is here:
> {code}
> 2020-03-04T11:27:25.4819486Z 
> ==
> 2020-03-04T11:27:25.4820470Z Running 'Running HA per-job cluster (rocks, 
> non-incremental) end-to-end test'
> 2020-03-04T11:27:25.4820922Z 
> ==
> 2020-03-04T11:27:25.4840177Z TEST_DATA_DIR: 
> /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-25482960156
> 2020-03-04T11:27:25.6712478Z Flink dist directory: 
> /home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT
> 2020-03-04T11:27:25.6830402Z Flink dist directory: 
> /home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT
> 2020-03-04T11:27:26.2988914Z Starting zookeeper daemon on host fv-az655.
> 2020-03-04T11:27:26.3001237Z Running on HA mode: parallelism=4, 
> backend=rocks, asyncSnapshots=true, and incremSnapshots=false.
> 2020-03-04T11:27:27.4206924Z Starting standalonejob daemon on host fv-az655.
> 2020-03-04T11:27:27.4217066Z Start 1 more task managers
> 2020-03-04T11:27:30.8412541Z Starting taskexecutor daemon on host fv-az655.
> 2020-03-04T11:27:38.1779980Z Job () is 
> running.
> 2020-03-04T11:27:38.1781375Z Running JM watchdog @ 89778
> 2020-03-04T11:27:38.1781858Z Running TM watchdog @ 89779
> 2020-03-04T11:27:38.1783272Z Waiting for text Completed checkpoint [1-9]* for 
> job  to appear 2 of times in logs...
> 2020-03-04T13:21:29.9076797Z ##[error]The operation was canceled.
> 2020-03-04T13:21:29.9094090Z ##[section]Finishing: Run e2e tests
> {code}
> The last three lines indicate that the test is waiting forever for a 
> checkpoint to appear.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-16423) test_ha_per_job_cluster_datastream.sh gets stuck

2020-04-08 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17078057#comment-17078057
 ] 

Biao Liu edited comment on FLINK-16423 at 4/8/20, 10:56 AM:


Thanks [~rmetzger] for analyzing so deeply. I checked the attached log. I 
believe the scenario is same with FLINK-16770. The problem happens when 
checkpoint 9 is doing finalization. We can see that the 
{{CheckpointCoordinator}} tried to recover from checkpoint 9. So the checkpoint 
9 must be added into {{CompletedCheckpointStore}}. However we can't find the 
log "Completed checkpoint 9 ...". It must failed after being added into 
{{CompletedCheckpointStore}}, like being aborted due to the "artificial 
failure". Regarding to "where is the checkpoint 6, 7, 8", since we only keep 1 
successful checkpoint in {{CompletedCheckpointStore}}, they must be subsumed 
when checkpoint 9 was adding into {{CompletedCheckpointStore}}.

The work-around fixing so far of FLINK-16770 is that keeping 2 successful 
checkpoints in {{CompletedCheckpointStore}} for these cases. So even if 
checkpoint 9 doesn't finish the finalization, there should be at least 
checkpoint 8 existing.

If it gets stuck quite frequently, we could apply the work-around fixing for 
the case. However this bug has to be fixed completely before releasing 1.11.


was (Author: sleepy):
Thanks [~rmetzger] for analyzing so deeply. I checked the attached log. I 
believe the scenario is same with FLINK-16770. The problem happens when 
checkpoint 9 is doing finalization. We can see that the 
{{CheckpointCoordinator}} tried to recover from checkpoint 9. So the checkpoint 
9 must be added into {{CompletedCheckpointStore}}. However we can't find the 
log "Completed checkpoint 9 ...". It must failed after being added into 
{{CompletedCheckpointStore}}, like being aborted due to the "artificial 
failure". Regarding to "where is the checkpoint 6, 7, 8", since we only keep 1 
successful checkpoint in {{CompletedCheckpointStore}}, they must be subsumed 
when checkpoint 9 was adding into {{CompletedCheckpointStore}}. 

The work-around fixing so far of FLINK-16770 is that keeping 2 successful 
checkpoints in {{CompletedCheckpointStore}] for these cases. So even if 
checkpoint 9 doesn't finish the finalization, there should be at least 
checkpoint 8 existing.

If it gets stuck quite frequently, we could apply the work-around fixing for 
the case. However this bug has to be fixed completely before releasing 1.11.

> test_ha_per_job_cluster_datastream.sh gets stuck
> 
>
> Key: FLINK-16423
> URL: https://issues.apache.org/jira/browse/FLINK-16423
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Tests
>Affects Versions: 1.11.0
>Reporter: Robert Metzger
>Assignee: Robert Metzger
>Priority: Blocker
> Attachments: 20200408.1.tgz
>
>
> This was seen in 
> https://dev.azure.com/rmetzger/Flink/_build/results?buildId=5905&view=logs&j=b1623ac9-0979-5b0d-2e5e-1377d695c991&t=e7804547-1789-5225-2bcf-269eeaa37447
>  ... the relevant part of the logs is here:
> {code}
> 2020-03-04T11:27:25.4819486Z 
> ==
> 2020-03-04T11:27:25.4820470Z Running 'Running HA per-job cluster (rocks, 
> non-incremental) end-to-end test'
> 2020-03-04T11:27:25.4820922Z 
> ==
> 2020-03-04T11:27:25.4840177Z TEST_DATA_DIR: 
> /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-25482960156
> 2020-03-04T11:27:25.6712478Z Flink dist directory: 
> /home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT
> 2020-03-04T11:27:25.6830402Z Flink dist directory: 
> /home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT
> 2020-03-04T11:27:26.2988914Z Starting zookeeper daemon on host fv-az655.
> 2020-03-04T11:27:26.3001237Z Running on HA mode: parallelism=4, 
> backend=rocks, asyncSnapshots=true, and incremSnapshots=false.
> 2020-03-04T11:27:27.4206924Z Starting standalonejob daemon on host fv-az655.
> 2020-03-04T11:27:27.4217066Z Start 1 more task managers
> 2020-03-04T11:27:30.8412541Z Starting taskexecutor daemon on host fv-az655.
> 2020-03-04T11:27:38.1779980Z Job () is 
> running.
> 2020-03-04T11:27:38.1781375Z Running JM watchdog @ 89778
> 2020-03-04T11:27:38.1781858Z Running TM watchdog @ 89779
> 2020-03-04T11:27:38.1783272Z Waiting for text Completed checkpoint [1-9]* for 
> job  to appear 2 of times in logs...
> 2020-03-04T13:21:29.9076797Z ##[error]The operation was canceled.
> 2020-03-04T13:21:29.9094090Z ##[section]Finishing: Run e2e tests
> {code}
> The l

[jira] [Commented] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file

2020-04-08 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17078067#comment-17078067
 ] 

Biao Liu commented on FLINK-16770:
--

To [~rmetzger], I think FLINK-16423 and this ticket fail in same scenario. To 
be short, the atomicity of finalizing a checkpoint is broken.
I wrote a comment in FLINK-16423.

> Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end 
> test fails with no such file
> ---
>
> Key: FLINK-16770
> URL: https://issues.apache.org/jira/browse/FLINK-16770
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Tests
>Affects Versions: 1.11.0
>Reporter: Zhijiang
>Assignee: Yun Tang
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.11.0
>
> Attachments: e2e-output.log, 
> flink-vsts-standalonesession-0-fv-az53.log
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The log : 
> [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6603&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5]
>  
> There was also the similar problem in 
> https://issues.apache.org/jira/browse/FLINK-16561, but for the case of no 
> parallelism change. And this case is for scaling up. Not quite sure whether 
> the root cause is the same one.
> {code:java}
> 2020-03-25T06:50:31.3894841Z Running 'Resuming Externalized Checkpoint 
> (rocks, incremental, scale up) end-to-end test'
> 2020-03-25T06:50:31.3895308Z 
> ==
> 2020-03-25T06:50:31.3907274Z TEST_DATA_DIR: 
> /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304
> 2020-03-25T06:50:31.5500274Z Flink dist directory: 
> /home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT
> 2020-03-25T06:50:31.6354639Z Starting cluster.
> 2020-03-25T06:50:31.8871932Z Starting standalonesession daemon on host 
> fv-az655.
> 2020-03-25T06:50:33.5021784Z Starting taskexecutor daemon on host fv-az655.
> 2020-03-25T06:50:33.5152274Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:34.5498116Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:35.6031346Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:36.9848425Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:38.0283377Z Dispatcher REST endpoint is up.
> 2020-03-25T06:50:38.0285490Z Running externalized checkpoints test, with 
> ORIGINAL_DOP=2 NEW_DOP=4 and STATE_BACKEND_TYPE=rocks 
> STATE_BACKEND_FILE_ASYNC=true STATE_BACKEND_ROCKSDB_INCREMENTAL=true 
> SIMULATE_FAILURE=false ...
> 2020-03-25T06:50:46.1754645Z Job (b8cb04e4b1e730585bc616aa352866d0) is 
> running.
> 2020-03-25T06:50:46.1758132Z Waiting for job 
> (b8cb04e4b1e730585bc616aa352866d0) to have at least 1 completed checkpoints 
> ...
> 2020-03-25T06:50:46.3478276Z Waiting for job to process up to 200 records, 
> current progress: 173 records ...
> 2020-03-25T06:50:49.6332988Z Cancelling job b8cb04e4b1e730585bc616aa352866d0.
> 2020-03-25T06:50:50.4875673Z Cancelled job b8cb04e4b1e730585bc616aa352866d0.
> 2020-03-25T06:50:50.5468230Z ls: cannot access 
> '/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304/externalized-chckpt-e2e-backend-dir/b8cb04e4b1e730585bc616aa352866d0/chk-[1-9]*/_metadata':
>  No such file or directory
> 2020-03-25T06:50:50.5606260Z Restoring job with externalized checkpoint at . 
> ...
> 2020-03-25T06:50:58.4728245Z 
> 2020-03-25T06:50:58.4732663Z 
> 
> 2020-03-25T06:50:58.4735785Z  The program finished with the following 
> exception:
> 2020-03-25T06:50:58.4737759Z 
> 2020-03-25T06:50:58.4742666Z 
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
> JobGraph.
> 2020-03-25T06:50:58.4746274Z  at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> 2020-03-25T06:50:58.4749954Z  at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> 2020-03-25T06:50:58.4752753Z  at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:142)
> 2020-03-25T06:50:58.4755400Z  at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:659)
> 2020-03-25T06:50:58.4757862Z  at 
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
> 2020-03-25T06:50:58.4760282Z  at 
> org.apache.fl

[jira] [Commented] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file

2020-04-14 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17083018#comment-17083018
 ] 

Biao Liu commented on FLINK-16770:
--

[~aljoscha], the uploading to transfer.sh failed, I can't confirm the root 
cause. It might be the same reason.

> Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end 
> test fails with no such file
> ---
>
> Key: FLINK-16770
> URL: https://issues.apache.org/jira/browse/FLINK-16770
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Tests
>Affects Versions: 1.11.0
>Reporter: Zhijiang
>Assignee: Yun Tang
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.11.0
>
> Attachments: e2e-output.log, 
> flink-vsts-standalonesession-0-fv-az53.log
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The log : 
> [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6603&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5]
>  
> There was also the similar problem in 
> https://issues.apache.org/jira/browse/FLINK-16561, but for the case of no 
> parallelism change. And this case is for scaling up. Not quite sure whether 
> the root cause is the same one.
> {code:java}
> 2020-03-25T06:50:31.3894841Z Running 'Resuming Externalized Checkpoint 
> (rocks, incremental, scale up) end-to-end test'
> 2020-03-25T06:50:31.3895308Z 
> ==
> 2020-03-25T06:50:31.3907274Z TEST_DATA_DIR: 
> /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304
> 2020-03-25T06:50:31.5500274Z Flink dist directory: 
> /home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT
> 2020-03-25T06:50:31.6354639Z Starting cluster.
> 2020-03-25T06:50:31.8871932Z Starting standalonesession daemon on host 
> fv-az655.
> 2020-03-25T06:50:33.5021784Z Starting taskexecutor daemon on host fv-az655.
> 2020-03-25T06:50:33.5152274Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:34.5498116Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:35.6031346Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:36.9848425Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:38.0283377Z Dispatcher REST endpoint is up.
> 2020-03-25T06:50:38.0285490Z Running externalized checkpoints test, with 
> ORIGINAL_DOP=2 NEW_DOP=4 and STATE_BACKEND_TYPE=rocks 
> STATE_BACKEND_FILE_ASYNC=true STATE_BACKEND_ROCKSDB_INCREMENTAL=true 
> SIMULATE_FAILURE=false ...
> 2020-03-25T06:50:46.1754645Z Job (b8cb04e4b1e730585bc616aa352866d0) is 
> running.
> 2020-03-25T06:50:46.1758132Z Waiting for job 
> (b8cb04e4b1e730585bc616aa352866d0) to have at least 1 completed checkpoints 
> ...
> 2020-03-25T06:50:46.3478276Z Waiting for job to process up to 200 records, 
> current progress: 173 records ...
> 2020-03-25T06:50:49.6332988Z Cancelling job b8cb04e4b1e730585bc616aa352866d0.
> 2020-03-25T06:50:50.4875673Z Cancelled job b8cb04e4b1e730585bc616aa352866d0.
> 2020-03-25T06:50:50.5468230Z ls: cannot access 
> '/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304/externalized-chckpt-e2e-backend-dir/b8cb04e4b1e730585bc616aa352866d0/chk-[1-9]*/_metadata':
>  No such file or directory
> 2020-03-25T06:50:50.5606260Z Restoring job with externalized checkpoint at . 
> ...
> 2020-03-25T06:50:58.4728245Z 
> 2020-03-25T06:50:58.4732663Z 
> 
> 2020-03-25T06:50:58.4735785Z  The program finished with the following 
> exception:
> 2020-03-25T06:50:58.4737759Z 
> 2020-03-25T06:50:58.4742666Z 
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
> JobGraph.
> 2020-03-25T06:50:58.4746274Z  at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> 2020-03-25T06:50:58.4749954Z  at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> 2020-03-25T06:50:58.4752753Z  at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:142)
> 2020-03-25T06:50:58.4755400Z  at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:659)
> 2020-03-25T06:50:58.4757862Z  at 
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
> 2020-03-25T06:50:58.4760282Z  at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:890

[jira] [Comment Edited] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file

2020-04-14 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17083018#comment-17083018
 ] 

Biao Liu edited comment on FLINK-16770 at 4/14/20, 9:13 AM:


[~aljoscha], the uploading to transfer.sh failed, I can't confirm the root 
cause. It might be the same reason. [~yunta], do you need some help?


was (Author: sleepy):
[~aljoscha], the uploading to transfer.sh failed, I can't confirm the root 
cause. It might be the same reason.

> Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end 
> test fails with no such file
> ---
>
> Key: FLINK-16770
> URL: https://issues.apache.org/jira/browse/FLINK-16770
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Tests
>Affects Versions: 1.11.0
>Reporter: Zhijiang
>Assignee: Yun Tang
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.11.0
>
> Attachments: e2e-output.log, 
> flink-vsts-standalonesession-0-fv-az53.log
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The log : 
> [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6603&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5]
>  
> There was also the similar problem in 
> https://issues.apache.org/jira/browse/FLINK-16561, but for the case of no 
> parallelism change. And this case is for scaling up. Not quite sure whether 
> the root cause is the same one.
> {code:java}
> 2020-03-25T06:50:31.3894841Z Running 'Resuming Externalized Checkpoint 
> (rocks, incremental, scale up) end-to-end test'
> 2020-03-25T06:50:31.3895308Z 
> ==
> 2020-03-25T06:50:31.3907274Z TEST_DATA_DIR: 
> /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304
> 2020-03-25T06:50:31.5500274Z Flink dist directory: 
> /home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT
> 2020-03-25T06:50:31.6354639Z Starting cluster.
> 2020-03-25T06:50:31.8871932Z Starting standalonesession daemon on host 
> fv-az655.
> 2020-03-25T06:50:33.5021784Z Starting taskexecutor daemon on host fv-az655.
> 2020-03-25T06:50:33.5152274Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:34.5498116Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:35.6031346Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:36.9848425Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:38.0283377Z Dispatcher REST endpoint is up.
> 2020-03-25T06:50:38.0285490Z Running externalized checkpoints test, with 
> ORIGINAL_DOP=2 NEW_DOP=4 and STATE_BACKEND_TYPE=rocks 
> STATE_BACKEND_FILE_ASYNC=true STATE_BACKEND_ROCKSDB_INCREMENTAL=true 
> SIMULATE_FAILURE=false ...
> 2020-03-25T06:50:46.1754645Z Job (b8cb04e4b1e730585bc616aa352866d0) is 
> running.
> 2020-03-25T06:50:46.1758132Z Waiting for job 
> (b8cb04e4b1e730585bc616aa352866d0) to have at least 1 completed checkpoints 
> ...
> 2020-03-25T06:50:46.3478276Z Waiting for job to process up to 200 records, 
> current progress: 173 records ...
> 2020-03-25T06:50:49.6332988Z Cancelling job b8cb04e4b1e730585bc616aa352866d0.
> 2020-03-25T06:50:50.4875673Z Cancelled job b8cb04e4b1e730585bc616aa352866d0.
> 2020-03-25T06:50:50.5468230Z ls: cannot access 
> '/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304/externalized-chckpt-e2e-backend-dir/b8cb04e4b1e730585bc616aa352866d0/chk-[1-9]*/_metadata':
>  No such file or directory
> 2020-03-25T06:50:50.5606260Z Restoring job with externalized checkpoint at . 
> ...
> 2020-03-25T06:50:58.4728245Z 
> 2020-03-25T06:50:58.4732663Z 
> 
> 2020-03-25T06:50:58.4735785Z  The program finished with the following 
> exception:
> 2020-03-25T06:50:58.4737759Z 
> 2020-03-25T06:50:58.4742666Z 
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
> JobGraph.
> 2020-03-25T06:50:58.4746274Z  at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> 2020-03-25T06:50:58.4749954Z  at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> 2020-03-25T06:50:58.4752753Z  at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:142)
> 2020-03-25T06:50:58.4755400Z  at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:659

[jira] [Commented] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file

2020-04-14 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17083325#comment-17083325
 ] 

Biao Liu commented on FLINK-16770:
--

Thanks [~rmetzger] for reminding.
[~yunta] good job, please give me a feedback if you need any help.

> Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end 
> test fails with no such file
> ---
>
> Key: FLINK-16770
> URL: https://issues.apache.org/jira/browse/FLINK-16770
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Tests
>Affects Versions: 1.11.0
>Reporter: Zhijiang
>Assignee: Yun Tang
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.11.0
>
> Attachments: e2e-output.log, 
> flink-vsts-standalonesession-0-fv-az53.log
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The log : 
> [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6603&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5]
>  
> There was also the similar problem in 
> https://issues.apache.org/jira/browse/FLINK-16561, but for the case of no 
> parallelism change. And this case is for scaling up. Not quite sure whether 
> the root cause is the same one.
> {code:java}
> 2020-03-25T06:50:31.3894841Z Running 'Resuming Externalized Checkpoint 
> (rocks, incremental, scale up) end-to-end test'
> 2020-03-25T06:50:31.3895308Z 
> ==
> 2020-03-25T06:50:31.3907274Z TEST_DATA_DIR: 
> /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304
> 2020-03-25T06:50:31.5500274Z Flink dist directory: 
> /home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT
> 2020-03-25T06:50:31.6354639Z Starting cluster.
> 2020-03-25T06:50:31.8871932Z Starting standalonesession daemon on host 
> fv-az655.
> 2020-03-25T06:50:33.5021784Z Starting taskexecutor daemon on host fv-az655.
> 2020-03-25T06:50:33.5152274Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:34.5498116Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:35.6031346Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:36.9848425Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:38.0283377Z Dispatcher REST endpoint is up.
> 2020-03-25T06:50:38.0285490Z Running externalized checkpoints test, with 
> ORIGINAL_DOP=2 NEW_DOP=4 and STATE_BACKEND_TYPE=rocks 
> STATE_BACKEND_FILE_ASYNC=true STATE_BACKEND_ROCKSDB_INCREMENTAL=true 
> SIMULATE_FAILURE=false ...
> 2020-03-25T06:50:46.1754645Z Job (b8cb04e4b1e730585bc616aa352866d0) is 
> running.
> 2020-03-25T06:50:46.1758132Z Waiting for job 
> (b8cb04e4b1e730585bc616aa352866d0) to have at least 1 completed checkpoints 
> ...
> 2020-03-25T06:50:46.3478276Z Waiting for job to process up to 200 records, 
> current progress: 173 records ...
> 2020-03-25T06:50:49.6332988Z Cancelling job b8cb04e4b1e730585bc616aa352866d0.
> 2020-03-25T06:50:50.4875673Z Cancelled job b8cb04e4b1e730585bc616aa352866d0.
> 2020-03-25T06:50:50.5468230Z ls: cannot access 
> '/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304/externalized-chckpt-e2e-backend-dir/b8cb04e4b1e730585bc616aa352866d0/chk-[1-9]*/_metadata':
>  No such file or directory
> 2020-03-25T06:50:50.5606260Z Restoring job with externalized checkpoint at . 
> ...
> 2020-03-25T06:50:58.4728245Z 
> 2020-03-25T06:50:58.4732663Z 
> 
> 2020-03-25T06:50:58.4735785Z  The program finished with the following 
> exception:
> 2020-03-25T06:50:58.4737759Z 
> 2020-03-25T06:50:58.4742666Z 
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
> JobGraph.
> 2020-03-25T06:50:58.4746274Z  at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> 2020-03-25T06:50:58.4749954Z  at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> 2020-03-25T06:50:58.4752753Z  at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:142)
> 2020-03-25T06:50:58.4755400Z  at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:659)
> 2020-03-25T06:50:58.4757862Z  at 
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
> 2020-03-25T06:50:58.4760282Z  at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:890)
> 2020-03

[jira] [Commented] (FLINK-16931) Large _metadata file lead to JobManager not responding when restart

2020-04-17 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085579#comment-17085579
 ] 

Biao Liu commented on FLINK-16931:
--

Hi [~qqibrow], thanks for opening the PR. I'll try to find some time next week 
to take a look. Too many things this week, sadly :(

> Large _metadata file lead to JobManager not responding when restart
> ---
>
> Key: FLINK-16931
> URL: https://issues.apache.org/jira/browse/FLINK-16931
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / Coordination
>Affects Versions: 1.9.2, 1.10.0, 1.11.0
>Reporter: Lu Niu
>Assignee: Lu Niu
>Priority: Critical
> Fix For: 1.11.0
>
>
> When _metadata file is big, JobManager could never recover from checkpoint. 
> It fall into a loop that fetch checkpoint -> JM timeout -> restart. Here is 
> related log: 
> {code:java}
>  2020-04-01 17:08:25,689 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - 
> Recovering checkpoints from ZooKeeper.
>  2020-04-01 17:08:25,698 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Found 
> 3 checkpoints in ZooKeeper.
>  2020-04-01 17:08:25,698 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - 
> Trying to fetch 3 checkpoints from storage.
>  2020-04-01 17:08:25,698 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - 
> Trying to retrieve checkpoint 50.
>  2020-04-01 17:08:48,589 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - 
> Trying to retrieve checkpoint 51.
>  2020-04-01 17:09:12,775 INFO org.apache.flink.yarn.YarnResourceManager - The 
> heartbeat of JobManager with id 02500708baf0bb976891c391afd3d7d5 timed out.
> {code}
> Digging into the code, looks like ExecutionGraph::restart runs in JobMaster 
> main thread and finally calls 
> ZooKeeperCompletedCheckpointStore::retrieveCompletedCheckpoint which download 
> file form DFS. The main thread is basically blocked for a while because of 
> this. One possible solution is to making the downloading part async. More 
> things might need to consider as the original change tries to make it 
> single-threaded. [https://github.com/apache/flink/pull/7568]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14971) Make all the non-IO operations in CheckpointCoordinator single-threaded

2020-03-08 Thread Biao Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-14971:
-
Summary: Make all the non-IO operations in CheckpointCoordinator 
single-threaded  (was: Move ACK and declined message handling in the same 
thread with triggering)

> Make all the non-IO operations in CheckpointCoordinator single-threaded
> ---
>
> Key: FLINK-14971
> URL: https://issues.apache.org/jira/browse/FLINK-14971
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently the ACK and declined message handling are executed in IO thread. It 
> should be moved into main thread eventually.
> After this step, all operations could be executed in main thread. Also we 
> don't need coordinator-wide lock anymore then.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14971) Make all the non-IO operations in CheckpointCoordinator single-threaded

2020-03-08 Thread Biao Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-14971:
-
Description: 
Currently the ACK and declined message handling are executed in IO thread. This 
is the only rest part that non-IO operations are executed in IO thread. It 
blocks introducing main thread executor for {{CheckpointCoordinator}}. It would 
be resolved in this task.

After resolving the ACK and declined message issue, all operations could be 
executed in main thread. Also we don't need coordinator-wide lock anymore then.




  was:
Currently the ACK and declined message handling are executed in IO thread. It 
should be moved into main thread eventually.
After this step, all operations could be executed in main thread. Also we don't 
need coordinator-wide lock anymore then.


> Make all the non-IO operations in CheckpointCoordinator single-threaded
> ---
>
> Key: FLINK-14971
> URL: https://issues.apache.org/jira/browse/FLINK-14971
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently the ACK and declined message handling are executed in IO thread. 
> This is the only rest part that non-IO operations are executed in IO thread. 
> It blocks introducing main thread executor for {{CheckpointCoordinator}}. It 
> would be resolved in this task.
> After resolving the ACK and declined message issue, all operations could be 
> executed in main thread. Also we don't need coordinator-wide lock anymore 
> then.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14971) Make all the non-IO operations in CheckpointCoordinator single-threaded

2020-03-08 Thread Biao Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-14971:
-
Description: 
Currently the ACK and declined message handling are executed in IO thread. This 
is the only rest part that non-IO operations are executed in IO thread. It 
blocks introducing main thread executor for {{CheckpointCoordinator}}. It would 
be resolved in this task.

After resolving the ACK and declined message issue, the main thread executor 
would be introduced into {{CheckpointCoordinator}} to instead of timer thread. 
However the timer thread would be kept (maybe for a while temporarily) to 
schedule periodic triggering, since FLINK-13848 is not accepted yet.

  was:
Currently the ACK and declined message handling are executed in IO thread. This 
is the only rest part that non-IO operations are executed in IO thread. It 
blocks introducing main thread executor for {{CheckpointCoordinator}}. It would 
be resolved in this task.

After resolving the ACK and declined message issue, all operations could be 
executed in main thread. Also we don't need coordinator-wide lock anymore then.





> Make all the non-IO operations in CheckpointCoordinator single-threaded
> ---
>
> Key: FLINK-14971
> URL: https://issues.apache.org/jira/browse/FLINK-14971
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently the ACK and declined message handling are executed in IO thread. 
> This is the only rest part that non-IO operations are executed in IO thread. 
> It blocks introducing main thread executor for {{CheckpointCoordinator}}. It 
> would be resolved in this task.
> After resolving the ACK and declined message issue, the main thread executor 
> would be introduced into {{CheckpointCoordinator}} to instead of timer 
> thread. However the timer thread would be kept (maybe for a while 
> temporarily) to schedule periodic triggering, since FLINK-13848 is not 
> accepted yet.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16561) Resuming Externalized Checkpoint (rocks, incremental, no parallelism change) end-to-end test fails on Azure

2020-03-11 Thread Biao Liu (Jira)
Biao Liu created FLINK-16561:


 Summary: Resuming Externalized Checkpoint (rocks, incremental, no 
parallelism change) end-to-end test fails on Azure
 Key: FLINK-16561
 URL: https://issues.apache.org/jira/browse/FLINK-16561
 Project: Flink
  Issue Type: Test
  Components: Tests
Affects Versions: 1.11.0
Reporter: Biao Liu


{quote}Caused by: java.io.IOException: Cannot access file system for 
checkpoint/savepoint path 'file://.'.
at 
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpointPointer(AbstractFsCheckpointStorage.java:233)
at 
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpoint(AbstractFsCheckpointStorage.java:110)
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1332)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:314)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:247)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:223)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:118)
at 
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103)
at 
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:281)
at 
org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:269)
at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
at 
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:146)
... 10 more
Caused by: java.io.IOException: Found local file path with authority '.' in 
path 'file://.'. Hint: Did you forget a slash? (correct path would be 
'file:///.')
at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:441)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
at 
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpointPointer(AbstractFsCheckpointStorage.java:230)
... 22 more
{quote}

The original log is here, 
https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6073&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=2b7514ee-e706-5046-657b-3430666e7bd9

There are some similar tickets about this case, but the stack here looks 
different. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14971) Make all the non-IO operations in CheckpointCoordinator single-threaded

2020-06-09 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17129274#comment-17129274
 ] 

Biao Liu commented on FLINK-14971:
--

I spent a bit time to recall the summary of Stephan.

The first step "(1) When the checkpoint is ready (all tasks acked, metadata 
written out), Checkpoint Coordinator transfers ownership to the 
CompletedCheckpointStore" is a good idea for me. When checkpoint is ready, it 
can't be cancelled. Otherwise we need to think about how to revert 
{{CompletedCheckpointStore}}. It simplifies the scenario a lot.

Let's focus on the second step. If I understand correctly, the option (b) is a 
bit subtle. If JM recovers from a checkpoint (N) which is not persist to ZK, 
and then the JM process is gone, the new JM would recover from checkpoint N-1. 
I'm not sure there is no side-effect at all of both JM and TM side. But my gut 
feeling is that it might be a dangerous semantic. It might break assumption of 
some users.

The option (a) is the most feasible one for me. There are some facts behind 
this solution, please correct me if I'm wrong.
 1. The asynchronous committing of {{CompletedCheckpointStore}} must be done 
first, then {{CheckpointCoordinator}} notifies tasks that the checkpoint is 
completed. Otherwise the rule "NOTE: It is not fine to ignore it and start from 
an earlier checkpoint if it will get committed later. That is the bug to 
prevent" might be broken. The corner case is like, when 
{{CheckpointCoordinator}} notifies tasks that checkpoint N is completed first, 
then commit to ZK asynchronously(not successful yet), the JM process is gone. A 
new JM process starts, it would recover from checkpoint N-1, because N-1 is the 
last successful checkpoint recorded in ZK.
 2. If job fails before asynchronous committing completes, 
{{CheckpointCoordinator}} needs to decide how to handle this committing. When 
committing completes, JM might be stuck in restoring or other steps (like 
cancelling tasks). I see two options. Option A is failing this checkpoint, 
revert {{CheckpointCoordinator}} and do not do not subsume older checkpoints 
(which is described in FLINK-16770). Option B is treating this checkpoint as a 
successful one but do not notify tasks, because tasks are cancelling or waiting 
to be restarted, it's meaningless. I think option B is simpler and better and 
also acceptable because the notification of checkpoint completing is not 
guaranteed anyway.

> Make all the non-IO operations in CheckpointCoordinator single-threaded
> ---
>
> Key: FLINK-14971
> URL: https://issues.apache.org/jira/browse/FLINK-14971
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Currently the ACK and declined message handling are executed in IO thread. 
> This is the only rest part that non-IO operations are executed in IO thread. 
> It blocks introducing main thread executor for {{CheckpointCoordinator}}. It 
> would be resolved in this task.
> After resolving the ACK and declined message issue, the main thread executor 
> would be introduced into {{CheckpointCoordinator}} to instead of timer 
> thread. However the timer thread would be kept (maybe for a while 
> temporarily) to schedule periodic triggering, since FLINK-13848 is not 
> accepted yet.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file

2020-04-25 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17092098#comment-17092098
 ] 

Biao Liu commented on FLINK-16770:
--

Technically speaking, the scenario we discussed here should not happen with the 
reverted codes. The finalization of checkpoint is reverted to be executed 
synchronously and wrapped in the coordinator-wide lock. There shouldn't be race 
condition at all. On the other hand, the earlier commits of the refactoring are 
merged over 3 months ago. So to answer the question of [~pnowojski], I think we 
have reverted enough commits.

I have noticed that there are some logs:
{quote}kill: usage: kill [-s sigspec | -n signum | -sigspec] pid | jobspec ... 
or kill -l [sigspec]
 Killed TM @
{quote}
It seems that there is no TM process at some time. I guess it's not a normal 
scenario. The {{ha_tm_watchdog}} in common_ha.sh should start a new TM before 
killing an old one in this case. What if there is no TM process at all? Exited 
or killed unexpectedly? I'm not sure. I think there will be no enough TM to 
finish the testing case. Because the {{ha_tm_watchdog}} only starts a new TM if 
there are enough TMs,
{quote}local MISSING_TMS=$((EXPECTED_TMS-RUNNING_TMS))
 if [ ${MISSING_TMS} -eq 0 ]; then
 # start a new TM only if we have exactly the expected number
 "$FLINK_DIR"/bin/taskmanager.sh start > /dev/null
 fi{quote}
I guess the failure cause is another one, maybe it's relevant to the "no TM 
process". But I can't tell what really happened in this case without any other 
logs. Is there any way we could find the JM logs? [~rmetzger]

> Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end 
> test fails with no such file
> ---
>
> Key: FLINK-16770
> URL: https://issues.apache.org/jira/browse/FLINK-16770
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Tests
>Affects Versions: 1.11.0
>Reporter: Zhijiang
>Assignee: Yun Tang
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.11.0
>
> Attachments: e2e-output.log, 
> flink-vsts-standalonesession-0-fv-az53.log, image-2020-04-16-11-24-54-549.png
>
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> The log : 
> [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6603&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5]
>  
> There was also the similar problem in 
> https://issues.apache.org/jira/browse/FLINK-16561, but for the case of no 
> parallelism change. And this case is for scaling up. Not quite sure whether 
> the root cause is the same one.
> {code:java}
> 2020-03-25T06:50:31.3894841Z Running 'Resuming Externalized Checkpoint 
> (rocks, incremental, scale up) end-to-end test'
> 2020-03-25T06:50:31.3895308Z 
> ==
> 2020-03-25T06:50:31.3907274Z TEST_DATA_DIR: 
> /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304
> 2020-03-25T06:50:31.5500274Z Flink dist directory: 
> /home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT
> 2020-03-25T06:50:31.6354639Z Starting cluster.
> 2020-03-25T06:50:31.8871932Z Starting standalonesession daemon on host 
> fv-az655.
> 2020-03-25T06:50:33.5021784Z Starting taskexecutor daemon on host fv-az655.
> 2020-03-25T06:50:33.5152274Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:34.5498116Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:35.6031346Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:36.9848425Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:38.0283377Z Dispatcher REST endpoint is up.
> 2020-03-25T06:50:38.0285490Z Running externalized checkpoints test, with 
> ORIGINAL_DOP=2 NEW_DOP=4 and STATE_BACKEND_TYPE=rocks 
> STATE_BACKEND_FILE_ASYNC=true STATE_BACKEND_ROCKSDB_INCREMENTAL=true 
> SIMULATE_FAILURE=false ...
> 2020-03-25T06:50:46.1754645Z Job (b8cb04e4b1e730585bc616aa352866d0) is 
> running.
> 2020-03-25T06:50:46.1758132Z Waiting for job 
> (b8cb04e4b1e730585bc616aa352866d0) to have at least 1 completed checkpoints 
> ...
> 2020-03-25T06:50:46.3478276Z Waiting for job to process up to 200 records, 
> current progress: 173 records ...
> 2020-03-25T06:50:49.6332988Z Cancelling job b8cb04e4b1e730585bc616aa352866d0.
> 2020-03-25T06:50:50.4875673Z Cancelled job b8cb04e4b1e730585bc616aa352866d0.
> 2020-03-25T06:50:50.5468230Z ls: cannot access 
> '/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304/externalized-chckpt-e2e-backend-dir/b8cb04e4b1e730585bc616aa352

[jira] [Comment Edited] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file

2020-04-25 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17092098#comment-17092098
 ] 

Biao Liu edited comment on FLINK-16770 at 4/25/20, 7:12 AM:


Technically speaking, the scenario we discussed here should not happen with the 
reverted commits. The finalization of checkpoint is reverted to be executed 
synchronously and wrapped in the coordinator-wide lock. There shouldn't be race 
condition at all. On the other hand, the earlier commits of the refactoring are 
merged over 3 months ago. So to answer the question of [~pnowojski], I think we 
have reverted enough commits.

I have noticed that there are some logs:
{quote}kill: usage: kill [-s sigspec | -n signum | -sigspec] pid | jobspec ... 
or kill -l [sigspec]
 Killed TM @
{quote}
It seems that there is no TM process at some time. I guess it's not a normal 
scenario. The {{ha_tm_watchdog}} in common_ha.sh should start a new TM before 
killing an old one in this case. What if there is no TM process at all? Exited 
or killed unexpectedly? I'm not sure. I think there will be no enough TM to 
finish the testing case. Because the {{ha_tm_watchdog}} only starts a new TM if 
there are enough TMs,
{quote}local MISSING_TMS=$((EXPECTED_TMS-RUNNING_TMS))
 if [ ${MISSING_TMS} -eq 0 ]; then

    start a new TM only if we have exactly the expected number
    "$FLINK_DIR"/bin/taskmanager.sh start > /dev/null
 fi
{quote}
I guess the failure cause is another one, maybe it's relevant to the "no TM 
process". But I can't tell what really happened in this case without any other 
logs. Is there any way we could find the JM logs? [~rmetzger]


was (Author: sleepy):
Technically speaking, the scenario we discussed here should not happen with the 
reverted codes. The finalization of checkpoint is reverted to be executed 
synchronously and wrapped in the coordinator-wide lock. There shouldn't be race 
condition at all. On the other hand, the earlier commits of the refactoring are 
merged over 3 months ago. So to answer the question of [~pnowojski], I think we 
have reverted enough commits.

I have noticed that there are some logs:
{quote}kill: usage: kill [-s sigspec | -n signum | -sigspec] pid | jobspec ... 
or kill -l [sigspec]
 Killed TM @
{quote}
It seems that there is no TM process at some time. I guess it's not a normal 
scenario. The {{ha_tm_watchdog}} in common_ha.sh should start a new TM before 
killing an old one in this case. What if there is no TM process at all? Exited 
or killed unexpectedly? I'm not sure. I think there will be no enough TM to 
finish the testing case. Because the {{ha_tm_watchdog}} only starts a new TM if 
there are enough TMs,
{quote}local MISSING_TMS=$((EXPECTED_TMS-RUNNING_TMS))
 if [ ${MISSING_TMS} -eq 0 ]; then
 # start a new TM only if we have exactly the expected number
 "$FLINK_DIR"/bin/taskmanager.sh start > /dev/null
 fi{quote}
I guess the failure cause is another one, maybe it's relevant to the "no TM 
process". But I can't tell what really happened in this case without any other 
logs. Is there any way we could find the JM logs? [~rmetzger]

> Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end 
> test fails with no such file
> ---
>
> Key: FLINK-16770
> URL: https://issues.apache.org/jira/browse/FLINK-16770
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Tests
>Affects Versions: 1.11.0
>Reporter: Zhijiang
>Assignee: Yun Tang
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.11.0
>
> Attachments: e2e-output.log, 
> flink-vsts-standalonesession-0-fv-az53.log, image-2020-04-16-11-24-54-549.png
>
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> The log : 
> [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6603&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5]
>  
> There was also the similar problem in 
> https://issues.apache.org/jira/browse/FLINK-16561, but for the case of no 
> parallelism change. And this case is for scaling up. Not quite sure whether 
> the root cause is the same one.
> {code:java}
> 2020-03-25T06:50:31.3894841Z Running 'Resuming Externalized Checkpoint 
> (rocks, incremental, scale up) end-to-end test'
> 2020-03-25T06:50:31.3895308Z 
> ==
> 2020-03-25T06:50:31.3907274Z TEST_DATA_DIR: 
> /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304
> 2020-03-25T06:50:31.5500274Z Flink dist directory: 
> /home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT
> 2020-03-25T06:50:

[jira] [Updated] (FLINK-9900) Fix unstable test ZooKeeperHighAvailabilityITCase#testRestoreBehaviourWithFaultyStateHandles

2020-05-06 Thread Biao Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-9900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-9900:

Attachment: mvn-2.log

> Fix unstable test 
> ZooKeeperHighAvailabilityITCase#testRestoreBehaviourWithFaultyStateHandles
> 
>
> Key: FLINK-9900
> URL: https://issues.apache.org/jira/browse/FLINK-9900
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.5.1, 1.6.0, 1.9.0
>Reporter: zhangminglei
>Assignee: Biao Liu
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.9.1, 1.10.0
>
> Attachments: mvn-2.log
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> https://api.travis-ci.org/v3/job/405843617/log.txt
> Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 124.598 sec 
> <<< FAILURE! - in 
> org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase
>  
> testRestoreBehaviourWithFaultyStateHandles(org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase)
>  Time elapsed: 120.036 sec <<< ERROR!
>  org.junit.runners.model.TestTimedOutException: test timed out after 12 
> milliseconds
>  at sun.misc.Unsafe.park(Native Method)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
>  at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>  at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
>  at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>  at 
> org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase.testRestoreBehaviourWithFaultyStateHandles(ZooKeeperHighAvailabilityITCase.java:244)
> Results :
> Tests in error: 
>  
> ZooKeeperHighAvailabilityITCase.testRestoreBehaviourWithFaultyStateHandles:244
>  » TestTimedOut
> Tests run: 1453, Failures: 0, Errors: 1, Skipped: 29



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-9900) Fix unstable test ZooKeeperHighAvailabilityITCase#testRestoreBehaviourWithFaultyStateHandles

2020-05-06 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-9900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17101335#comment-17101335
 ] 

Biao Liu commented on FLINK-9900:
-

Thanks [~rmetzger] for reporting.
This time, the case failed to submit job to cluster. The cluster didn't start 
the job within 10 seconds, so timeout happened. It's hard to say which step it 
got stuck in. The last log of {{JobMaster}} is "Configuring application-defined 
state backend with job/cluster config". I have attached the relevant log 
(mvn-2.log).
[~trohrmann] do you have any idea?

> Fix unstable test 
> ZooKeeperHighAvailabilityITCase#testRestoreBehaviourWithFaultyStateHandles
> 
>
> Key: FLINK-9900
> URL: https://issues.apache.org/jira/browse/FLINK-9900
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.5.1, 1.6.0, 1.9.0
>Reporter: zhangminglei
>Assignee: Biao Liu
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.9.1, 1.10.0
>
> Attachments: mvn-2.log
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> https://api.travis-ci.org/v3/job/405843617/log.txt
> Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 124.598 sec 
> <<< FAILURE! - in 
> org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase
>  
> testRestoreBehaviourWithFaultyStateHandles(org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase)
>  Time elapsed: 120.036 sec <<< ERROR!
>  org.junit.runners.model.TestTimedOutException: test timed out after 12 
> milliseconds
>  at sun.misc.Unsafe.park(Native Method)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
>  at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>  at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
>  at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>  at 
> org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase.testRestoreBehaviourWithFaultyStateHandles(ZooKeeperHighAvailabilityITCase.java:244)
> Results :
> Tests in error: 
>  
> ZooKeeperHighAvailabilityITCase.testRestoreBehaviourWithFaultyStateHandles:244
>  » TestTimedOut
> Tests run: 1453, Failures: 0, Errors: 1, Skipped: 29



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18137) JobMasterTriggerSavepointITCase.testStopJobAfterSavepoint fails with AskTimeoutException

2020-06-11 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1717#comment-1717
 ] 

Biao Liu commented on FLINK-18137:
--

I just saw this issue. I think [~trohrmann] is right.
There is a problem of if/else in 
[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L547].
 The {{throwable}} passed to {{onTriggerFailure}} can be null unexpectedly. 
Actually that's my fault, this code is written by me and I realized it some 
days ago. I was planning to fix it in the later PR because I checked it at that 
time that it can't raise NPE, so I thought it's not emergency. However 
FLINK-16770 breaks the plan, I reverted a lot of codes and forgot to fix this 
potential issue separately. Unfortunately 
https://github.com/apache/flink/commit/1af33f1285d557f0171f4587d7f4e789df27e7cb 
hits this NPE. {{onTriggerFailure}} shouldn't throw any exception by design.
The codes changed a bit from my last commit. I need to double check the comment 
mentioned by [~roman_khachatryan] to make sure there is no other issue.


> JobMasterTriggerSavepointITCase.testStopJobAfterSavepoint fails with 
> AskTimeoutException
> 
>
> Key: FLINK-18137
> URL: https://issues.apache.org/jira/browse/FLINK-18137
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / Coordination, Runtime 
> / Task, Tests
>Affects Versions: 1.11.0, 1.12.0
>Reporter: Robert Metzger
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.11.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=2747&view=logs&j=5c8e7682-d68f-54d1-16a2-a09310218a49&t=45cc9205-bdb7-5b54-63cd-89fdc0983323
> {code}
> 2020-06-04T16:17:20.4404189Z [ERROR] Tests run: 4, Failures: 0, Errors: 1, 
> Skipped: 0, Time elapsed: 14.352 s <<< FAILURE! - in 
> org.apache.flink.runtime.jobmaster.JobMasterTriggerSavepointITCase
> 2020-06-04T16:17:20.4405548Z [ERROR] 
> testStopJobAfterSavepoint(org.apache.flink.runtime.jobmaster.JobMasterTriggerSavepointITCase)
>   Time elapsed: 10.058 s  <<< ERROR!
> 2020-06-04T16:17:20.4407342Z java.util.concurrent.ExecutionException: 
> java.util.concurrent.TimeoutException: Invocation of public default 
> java.util.concurrent.CompletableFuture 
> org.apache.flink.runtime.webmonitor.RestfulGateway.triggerSavepoint(org.apache.flink.api.common.JobID,java.lang.String,boolean,org.apache.flink.api.common.time.Time)
>  timed out.
> 2020-06-04T16:17:20.4409562Z  at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> 2020-06-04T16:17:20.4410333Z  at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> 2020-06-04T16:17:20.4411259Z  at 
> org.apache.flink.runtime.jobmaster.JobMasterTriggerSavepointITCase.cancelWithSavepoint(JobMasterTriggerSavepointITCase.java:264)
> 2020-06-04T16:17:20.4412292Z  at 
> org.apache.flink.runtime.jobmaster.JobMasterTriggerSavepointITCase.testStopJobAfterSavepoint(JobMasterTriggerSavepointITCase.java:127)
> 2020-06-04T16:17:20.4413163Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2020-06-04T16:17:20.4413990Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2020-06-04T16:17:20.4414783Z  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2020-06-04T16:17:20.4415936Z  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2020-06-04T16:17:20.4416693Z  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 2020-06-04T16:17:20.4417632Z  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2020-06-04T16:17:20.4418637Z  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 2020-06-04T16:17:20.4419367Z  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2020-06-04T16:17:20.4420118Z  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> 2020-06-04T16:17:20.4420742Z  at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> 2020-06-04T16:17:20.4421909Z  at 
> org.junit.rules.RunRules.evaluate(RunRules.java:20)
> 2020-06-04T16:17:20.4422493Z  at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> 2020-06-04T16:17:20.4423247Z  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> 2020-06-04T16:17:20.4424263Z  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> 2020-06-04T16:17:20.4424876Z  at 
> org.junit.runners.P

[jira] [Commented] (FLINK-17073) Slow checkpoint cleanup causing OOMs

2020-07-05 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17151618#comment-17151618
 ] 

Biao Liu commented on FLINK-17073:
--

Hi [~echauchot], thanks for doing so much. I left a couple of comments in 
design doc. The second proposal seems to be a reliable and light solution :)

> Slow checkpoint cleanup causing OOMs
> 
>
> Key: FLINK-17073
> URL: https://issues.apache.org/jira/browse/FLINK-17073
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.7.3, 1.8.0, 1.9.0, 1.10.0, 1.11.0
>Reporter: Till Rohrmann
>Priority: Major
> Fix For: 1.12.0
>
>
> A user reported that he sees a decline in checkpoint cleanup speed when 
> upgrading from Flink 1.7.2 to 1.10.0. The result is that a lot of cleanup 
> tasks are waiting in the execution queue occupying memory. Ultimately, the JM 
> process dies with an OOM.
> Compared to Flink 1.7.2, we introduced a dedicated {{ioExecutor}} which is 
> used by the {{HighAvailabilityServices}} (FLINK-11851). Before, we use the 
> {{AkkaRpcService}} thread pool which was a {{ForkJoinPool}} with a max 
> parallelism of 64. Now it is a {{FixedThreadPool}} with as many threads as 
> CPU cores. This change might have caused the decline in completed checkpoint 
> discard throughput. This suspicion needs to be validated before trying to fix 
> it!
> [1] 
> https://lists.apache.org/thread.html/r390e5d775878918edca0b6c9f18de96f828c266a888e34ed30ce8494%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17073) Slow checkpoint cleanup causing OOMs

2020-07-20 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17161657#comment-17161657
 ] 

Biao Liu commented on FLINK-17073:
--

[~echauchot], sorry I don't have the authorization of issue assignment. 
[~pnowojski], could you help to assign the ticket to him?

> Slow checkpoint cleanup causing OOMs
> 
>
> Key: FLINK-17073
> URL: https://issues.apache.org/jira/browse/FLINK-17073
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.7.3, 1.8.0, 1.9.0, 1.10.0, 1.11.0
>Reporter: Till Rohrmann
>Priority: Major
> Fix For: 1.12.0
>
>
> A user reported that he sees a decline in checkpoint cleanup speed when 
> upgrading from Flink 1.7.2 to 1.10.0. The result is that a lot of cleanup 
> tasks are waiting in the execution queue occupying memory. Ultimately, the JM 
> process dies with an OOM.
> Compared to Flink 1.7.2, we introduced a dedicated {{ioExecutor}} which is 
> used by the {{HighAvailabilityServices}} (FLINK-11851). Before, we use the 
> {{AkkaRpcService}} thread pool which was a {{ForkJoinPool}} with a max 
> parallelism of 64. Now it is a {{FixedThreadPool}} with as many threads as 
> CPU cores. This change might have caused the decline in completed checkpoint 
> discard throughput. This suspicion needs to be validated before trying to fix 
> it!
> [1] 
> https://lists.apache.org/thread.html/r390e5d775878918edca0b6c9f18de96f828c266a888e34ed30ce8494%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18641) "Failure to finalize checkpoint" error in MasterTriggerRestoreHook

2020-07-22 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17162553#comment-17162553
 ] 

Biao Liu commented on FLINK-18641:
--

Thanks [~becket_qin] for analyzing this issue. The asynchronous checkpoint 
threading model breaks the assumption of {{ExternallyInducedSource}} could 
trigger a checkpoint before {{MasterTriggerRestoreHook}} finishes the trigger 
future. We can find a way to guarantee that. However it's not friendly to the 
scenario that doing some initialization or preparation in 
{{MasterTriggerRestoreHook}}. Because checkpoint might be triggered before 
initialization of preparation finishes. Hope nobody uses it like that :(

Currently the semantics of {{ExternallyInducedSource}} is highly bound with the 
implementation of Flink checkpoint which should be avoided IMO. I think we 
should redesign the {{ExternallyInducedSource}} as a long-term goal.

To [~becket_qin], do you already have any idea for fixing it? If not, I could 
help to fix it.

BTW, this change of {{CheckpointCoordinator}} is introduced in 1.10. Is it 
possible that the failure of testing case is exposed by the change of 
{{OperatorCoordinator}}? Because we add another asynchronous step between 
master hook triggering and task triggering. I'm not sure if there must be some 
{{OperatorCoordinator}} added or not in the scenario of Pravega connector 
testing. If not, there is a work-around way that try to finish future returned 
by {{MasterTriggerRestoreHook.triggerCheckpoint}} before trigger task 
checkpoint (I assume there is only one master hook in the case). 

> "Failure to finalize checkpoint" error in MasterTriggerRestoreHook
> --
>
> Key: FLINK-18641
> URL: https://issues.apache.org/jira/browse/FLINK-18641
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.0
>Reporter: Brian Zhou
>Priority: Major
>
> https://github.com/pravega/flink-connectors is a Pravega connector for Flink. 
> The ReaderCheckpointHook[1] class uses the Flink `MasterTriggerRestoreHook` 
> interface to trigger the Pravega checkpoint during Flink checkpoints to make 
> sure the data recovery. The checkpoint recovery tests are running fine in 
> Flink 1.10, but it has below issues in Flink 1.11 causing the tests time out. 
> Suspect it is related to the checkpoint coordinator thread model changes in 
> Flink 1.11
> Error stacktrace:
> {code}
> 2020-07-09 15:39:39,999 30945 [jobmanager-future-thread-5] WARN  
> o.a.f.runtime.jobmaster.JobMaster - Error while processing checkpoint 
> acknowledgement message
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize 
> the pending checkpoint 3. Failure reason: Failure to finalize checkpoint.
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1033)
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:948)
>  at 
> org.apache.flink.runtime.scheduler.SchedulerBase.lambda$acknowledgeCheckpoint$4(SchedulerBase.java:802)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.SerializedThrowable: Pending checkpoint has 
> not been fully acknowledged yet
>  at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>  at 
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:298)
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1021)
>  ... 9 common frames omitted
> {code}
> More detail in this mailing thread: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Pravega-connector-cannot-recover-from-the-checkpoint-due-to-quot-Failure-to-finalize-checkpoint-quot-td36652.html
> Also in https://github.com/pravega/flink-connectors/issues/387



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18641) "Failure to finalize checkpoint" error in MasterTriggerRestoreHook

2020-07-23 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163341#comment-17163341
 ] 

Biao Liu commented on FLINK-18641:
--

Ah, sorry [~pnowojski], I forgot this change is not submitted to release-1.10 
at that time, it's only submitted to master.

[~becket_qin], thanks for correcting it. There are two different things of your 
response, master hook and {{OperatorCoordinator}}. Regarding to this ticket, 
it's not caused by the {{OperatorCoordinator}} part, right? I think we could 
file another ticket for it and discuss it there.

{quote}By design, the checkpoint should always actually take the snapshot of 
the master hooks and OperatorCoordinator first before taking the checkpoint on 
the tasks.{quote}
Technically speaking, there is no clear semantics that master hook should be 
taken before task snapshotting. For the {{ExternallyInducedSource}}, the task 
snapshotting might be taken before master hook finishes the future returned. 
And if there are multiple master hooks, some hooks might be invoked after task 
snapshotting. It's concurrent somewhat. I don't think we should/could guarantee 
the ordering here.

Anyway we have to fix the issue of {{ExternallyInducedSource}} caused by the 
ordering.
Regarding to the fixing plan. I'm not sure how heavy the fixing of 
{{OperatorCoordinator}} might be. If it's not a simple fixing, it might be 
better to separate these things into different patches. As a quick fixing of 
this ticket, we could take the master hook synchronously with the 
coordinator-wide lock retaining just like before.

{quote}I agree that In long run, the operator coordinator can actually 
supersede the master hooks. So we can probably mark the master hooks as 
deprecated.{quote}
Totally agree!

> "Failure to finalize checkpoint" error in MasterTriggerRestoreHook
> --
>
> Key: FLINK-18641
> URL: https://issues.apache.org/jira/browse/FLINK-18641
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.0
>Reporter: Brian Zhou
>Priority: Major
>
> https://github.com/pravega/flink-connectors is a Pravega connector for Flink. 
> The ReaderCheckpointHook[1] class uses the Flink `MasterTriggerRestoreHook` 
> interface to trigger the Pravega checkpoint during Flink checkpoints to make 
> sure the data recovery. The checkpoint recovery tests are running fine in 
> Flink 1.10, but it has below issues in Flink 1.11 causing the tests time out. 
> Suspect it is related to the checkpoint coordinator thread model changes in 
> Flink 1.11
> Error stacktrace:
> {code}
> 2020-07-09 15:39:39,999 30945 [jobmanager-future-thread-5] WARN  
> o.a.f.runtime.jobmaster.JobMaster - Error while processing checkpoint 
> acknowledgement message
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize 
> the pending checkpoint 3. Failure reason: Failure to finalize checkpoint.
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1033)
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:948)
>  at 
> org.apache.flink.runtime.scheduler.SchedulerBase.lambda$acknowledgeCheckpoint$4(SchedulerBase.java:802)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.SerializedThrowable: Pending checkpoint has 
> not been fully acknowledged yet
>  at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>  at 
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:298)
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1021)
>  ... 9 common frames omitted
> {code}
> More detail in this mailing thread: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Pravega-connector-cannot-recover-from-the-checkpoint-due-to-quot-Failure-to-finalize-checkpoint-quot-td36652.html
> Also in https://github.com/pravega/flink-connectors/issues/387



--

[jira] [Commented] (FLINK-17073) Slow checkpoint cleanup causing OOMs

2020-07-27 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17165518#comment-17165518
 ] 

Biao Liu commented on FLINK-17073:
--

To [~roman_khachatryan], thanks for nice suggestions!

{quote}I think an alternative (or complementary) temporary solution is to use a 
bounded queue when creating ioExecutor.{quote}
I'm not a fan of this temporary solution. We have to consider how to treat the 
invoker which launches asynchronous IO operations through {{ioExecutor}} if the 
queue is full. Make them failed or wait till there is some space available? I'm 
afraid it's not a small work to review all the places calls {{ioExecutor}}. If 
we want a temporary solution, maybe we could just increase the thread count. 

Regarding to the long-term solution. Actually Etienne and me have not discuss 
many of the implementation details. I just gave some suggestions to make sure 
it's in the right direction. It's cool to have your detailed suggestions. It 
may help a lot for the contributor who is not familiar with this part. I just 
thought we don't have to discuss too much details here. It might be better to 
give contributor more free space. We could pay more attention on code review to 
guarantee it's correct and reasonable.

BTW, just a tiny suggestion, code refactoring is not necessary, we should focus 
on solving the issue first. After that, we could consider if we could do some 
refactoring to make the codes more readable or elegant. 

To [~echauchot], besides the implementation, is there any question about the 
plan? Please feel free to ask anything that you don't understand. 

> Slow checkpoint cleanup causing OOMs
> 
>
> Key: FLINK-17073
> URL: https://issues.apache.org/jira/browse/FLINK-17073
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.7.3, 1.8.0, 1.9.0, 1.10.0, 1.11.0
>Reporter: Till Rohrmann
>Assignee: Etienne Chauchot
>Priority: Major
> Fix For: 1.12.0
>
>
> A user reported that he sees a decline in checkpoint cleanup speed when 
> upgrading from Flink 1.7.2 to 1.10.0. The result is that a lot of cleanup 
> tasks are waiting in the execution queue occupying memory. Ultimately, the JM 
> process dies with an OOM.
> Compared to Flink 1.7.2, we introduced a dedicated {{ioExecutor}} which is 
> used by the {{HighAvailabilityServices}} (FLINK-11851). Before, we use the 
> {{AkkaRpcService}} thread pool which was a {{ForkJoinPool}} with a max 
> parallelism of 64. Now it is a {{FixedThreadPool}} with as many threads as 
> CPU cores. This change might have caused the decline in completed checkpoint 
> discard throughput. This suspicion needs to be validated before trying to fix 
> it!
> [1] 
> https://lists.apache.org/thread.html/r390e5d775878918edca0b6c9f18de96f828c266a888e34ed30ce8494%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18641) "Failure to finalize checkpoint" error in MasterTriggerRestoreHook

2020-07-27 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17166057#comment-17166057
 ] 

Biao Liu commented on FLINK-18641:
--

[~becket_qin], [~pnowojski], the ticket has not been assigned yet. Is there 
anyone working on this?

> "Failure to finalize checkpoint" error in MasterTriggerRestoreHook
> --
>
> Key: FLINK-18641
> URL: https://issues.apache.org/jira/browse/FLINK-18641
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.0
>Reporter: Brian Zhou
>Priority: Major
>
> https://github.com/pravega/flink-connectors is a Pravega connector for Flink. 
> The ReaderCheckpointHook[1] class uses the Flink `MasterTriggerRestoreHook` 
> interface to trigger the Pravega checkpoint during Flink checkpoints to make 
> sure the data recovery. The checkpoint recovery tests are running fine in 
> Flink 1.10, but it has below issues in Flink 1.11 causing the tests time out. 
> Suspect it is related to the checkpoint coordinator thread model changes in 
> Flink 1.11
> Error stacktrace:
> {code}
> 2020-07-09 15:39:39,999 30945 [jobmanager-future-thread-5] WARN  
> o.a.f.runtime.jobmaster.JobMaster - Error while processing checkpoint 
> acknowledgement message
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize 
> the pending checkpoint 3. Failure reason: Failure to finalize checkpoint.
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1033)
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:948)
>  at 
> org.apache.flink.runtime.scheduler.SchedulerBase.lambda$acknowledgeCheckpoint$4(SchedulerBase.java:802)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.SerializedThrowable: Pending checkpoint has 
> not been fully acknowledged yet
>  at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>  at 
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:298)
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1021)
>  ... 9 common frames omitted
> {code}
> More detail in this mailing thread: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Pravega-connector-cannot-recover-from-the-checkpoint-due-to-quot-Failure-to-finalize-checkpoint-quot-td36652.html
> Also in https://github.com/pravega/flink-connectors/issues/387



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17073) Slow checkpoint cleanup causing OOMs

2020-07-28 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17166244#comment-17166244
 ] 

Biao Liu commented on FLINK-17073:
--

BTW [~echauchot], before writing any codes, it would be great to write an 
implementation plan first. That's a better place to discuss implementation 
detail.
I heard some other guys are also interested in this issue. It would be helpful 
fo them to understand what is happening. Besides that, there would be some 
other PRs on {{CheckpointCoordinator}} at the same time. We have to make sure 
there would be no big conflict between these changes.

> Slow checkpoint cleanup causing OOMs
> 
>
> Key: FLINK-17073
> URL: https://issues.apache.org/jira/browse/FLINK-17073
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.7.3, 1.8.0, 1.9.0, 1.10.0, 1.11.0
>Reporter: Till Rohrmann
>Assignee: Etienne Chauchot
>Priority: Major
> Fix For: 1.12.0
>
>
> A user reported that he sees a decline in checkpoint cleanup speed when 
> upgrading from Flink 1.7.2 to 1.10.0. The result is that a lot of cleanup 
> tasks are waiting in the execution queue occupying memory. Ultimately, the JM 
> process dies with an OOM.
> Compared to Flink 1.7.2, we introduced a dedicated {{ioExecutor}} which is 
> used by the {{HighAvailabilityServices}} (FLINK-11851). Before, we use the 
> {{AkkaRpcService}} thread pool which was a {{ForkJoinPool}} with a max 
> parallelism of 64. Now it is a {{FixedThreadPool}} with as many threads as 
> CPU cores. This change might have caused the decline in completed checkpoint 
> discard throughput. This suspicion needs to be validated before trying to fix 
> it!
> [1] 
> https://lists.apache.org/thread.html/r390e5d775878918edca0b6c9f18de96f828c266a888e34ed30ce8494%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16945) Execute CheckpointFailureManager.FailJobCallback directly in main thread executor

2020-07-30 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17168439#comment-17168439
 ] 

Biao Liu commented on FLINK-16945:
--

[~dian.fu], yes, I'm working on this. It depends on some other issues which I'm 
preparing PRs.

> Execute CheckpointFailureManager.FailJobCallback directly in main thread 
> executor
> -
>
> Key: FLINK-16945
> URL: https://issues.apache.org/jira/browse/FLINK-16945
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.10.0
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Since we have put all non-IO operations of {{CheckpointCoordinator}} into 
> main thread executor, the {{CheckpointFailureManager.FailJobCallback}} could 
> be executed directly now. In this way execution graph would fail immediately 
> when {{CheckpointFailureManager}} invokes the callback. We could avoid the 
> inconsistent scenario of FLINK-13497.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17073) Slow checkpoint cleanup causing OOMs

2020-08-03 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17170015#comment-17170015
 ] 

Biao Liu commented on FLINK-17073:
--

[~echauchot], sorry for the late reply. Thanks for pushing this!

I'm OK with [~roman_khachatryan]'s plan. It's simpler to implement in some 
aspects indeed. In my plan, we have to consider how to avoid synchronous 
cleaning which you mentioned. Because in the near future, 
{{CheckpointCoordinator}} would be no big lock anymore. 

{quote}...we can drop new checkpoint requests when there are too many 
checkpoints to clean...{quote}
I think we should take care of the cleaning for both successful checkpoint and 
failed checkpoint. 

I have left some comments in the doc.

> Slow checkpoint cleanup causing OOMs
> 
>
> Key: FLINK-17073
> URL: https://issues.apache.org/jira/browse/FLINK-17073
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.7.3, 1.8.0, 1.9.0, 1.10.0, 1.11.0
>Reporter: Till Rohrmann
>Assignee: Etienne Chauchot
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>
> A user reported that he sees a decline in checkpoint cleanup speed when 
> upgrading from Flink 1.7.2 to 1.10.0. The result is that a lot of cleanup 
> tasks are waiting in the execution queue occupying memory. Ultimately, the JM 
> process dies with an OOM.
> Compared to Flink 1.7.2, we introduced a dedicated {{ioExecutor}} which is 
> used by the {{HighAvailabilityServices}} (FLINK-11851). Before, we use the 
> {{AkkaRpcService}} thread pool which was a {{ForkJoinPool}} with a max 
> parallelism of 64. Now it is a {{FixedThreadPool}} with as many threads as 
> CPU cores. This change might have caused the decline in completed checkpoint 
> discard throughput. This suspicion needs to be validated before trying to fix 
> it!
> [1] 
> https://lists.apache.org/thread.html/r390e5d775878918edca0b6c9f18de96f828c266a888e34ed30ce8494%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-12866) Travis building failed due to cache dir missing

2019-06-17 Thread Biao Liu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-12866:
-
Summary: Travis building failed due to cache dir missing  (was: Connectors 
module failed on Travis checking)

> Travis building failed due to cache dir missing
> ---
>
> Key: FLINK-12866
> URL: https://issues.apache.org/jira/browse/FLINK-12866
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Affects Versions: 1.9.0
>Reporter: Biao Liu
>Priority: Minor
>
> Here is the failure information.
> Cached flink dir /home/travis/flink_cache/37821/flink does not exist. Exiting 
> build.
> The command "./tools/travis_controller.sh connectors" exited with 1.
> Full log is here, https://travis-ci.org/apache/flink/jobs/546546647



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12866) Travis building failed due to cache dir missing

2019-06-17 Thread Biao Liu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-12866:
-
Description: 
Here is the failure information.
Cached flink dir /home/travis/flink_cache/37821/flink does not exist. Exiting 
build.
The command "./tools/travis_controller.sh connectors" exited with 1.

Some instances
https://travis-ci.org/apache/flink/jobs/546546647
https://travis-ci.org/apache/flink/builds/547016834

It seems that it's not just about some specific modules, so I changed the title.
Many Travis buildings failed this morning with the same reason.
I'm not sure who is familiar with Travis, cc 

  was:
Here is the failure information.
Cached flink dir /home/travis/flink_cache/37821/flink does not exist. Exiting 
build.
The command "./tools/travis_controller.sh connectors" exited with 1.

Full log is here, https://travis-ci.org/apache/flink/jobs/546546647


> Travis building failed due to cache dir missing
> ---
>
> Key: FLINK-12866
> URL: https://issues.apache.org/jira/browse/FLINK-12866
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Affects Versions: 1.9.0
>Reporter: Biao Liu
>Priority: Minor
>
> Here is the failure information.
> Cached flink dir /home/travis/flink_cache/37821/flink does not exist. Exiting 
> build.
> The command "./tools/travis_controller.sh connectors" exited with 1.
> Some instances
> https://travis-ci.org/apache/flink/jobs/546546647
> https://travis-ci.org/apache/flink/builds/547016834
> It seems that it's not just about some specific modules, so I changed the 
> title.
> Many Travis buildings failed this morning with the same reason.
> I'm not sure who is familiar with Travis, cc 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12866) Travis building failed due to cache dir missing

2019-06-17 Thread Biao Liu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-12866:
-
Description: 
Here is the failure information.
 Cached flink dir /home/travis/flink_cache/37821/flink does not exist. Exiting 
build.
 The command "./tools/travis_controller.sh connectors" exited with 1.

Some instances
 [https://travis-ci.org/apache/flink/jobs/546546647]

[https://travis-ci.org/apache/flink/builds/547016834]

[https://travis-ci.org/apache/flink/builds/547001579]

[https://travis-ci.org/apache/flink/builds/546971532]

 

It seems that it's not just about some specific modules, so I changed the title.
 Many Travis buildings failed this morning with the same reason.
 I'm not sure who is familiar with Travis, cc [~StephanEwen]

  was:
Here is the failure information.
Cached flink dir /home/travis/flink_cache/37821/flink does not exist. Exiting 
build.
The command "./tools/travis_controller.sh connectors" exited with 1.

Some instances
https://travis-ci.org/apache/flink/jobs/546546647
https://travis-ci.org/apache/flink/builds/547016834

It seems that it's not just about some specific modules, so I changed the title.
Many Travis buildings failed this morning with the same reason.
I'm not sure who is familiar with Travis, cc 


> Travis building failed due to cache dir missing
> ---
>
> Key: FLINK-12866
> URL: https://issues.apache.org/jira/browse/FLINK-12866
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Affects Versions: 1.9.0
>Reporter: Biao Liu
>Priority: Minor
>
> Here is the failure information.
>  Cached flink dir /home/travis/flink_cache/37821/flink does not exist. 
> Exiting build.
>  The command "./tools/travis_controller.sh connectors" exited with 1.
> Some instances
>  [https://travis-ci.org/apache/flink/jobs/546546647]
> [https://travis-ci.org/apache/flink/builds/547016834]
> [https://travis-ci.org/apache/flink/builds/547001579]
> [https://travis-ci.org/apache/flink/builds/546971532]
>  
> It seems that it's not just about some specific modules, so I changed the 
> title.
>  Many Travis buildings failed this morning with the same reason.
>  I'm not sure who is familiar with Travis, cc [~StephanEwen]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12866) Travis building failed due to cache dir missing

2019-06-17 Thread Biao Liu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-12866:
-
Priority: Blocker  (was: Minor)

> Travis building failed due to cache dir missing
> ---
>
> Key: FLINK-12866
> URL: https://issues.apache.org/jira/browse/FLINK-12866
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Affects Versions: 1.9.0
>Reporter: Biao Liu
>Priority: Blocker
>
> Here is the failure information.
>  Cached flink dir /home/travis/flink_cache/37821/flink does not exist. 
> Exiting build.
>  The command "./tools/travis_controller.sh connectors" exited with 1.
> Some instances
>  [https://travis-ci.org/apache/flink/jobs/546546647]
> [https://travis-ci.org/apache/flink/builds/547016834]
> [https://travis-ci.org/apache/flink/builds/547001579]
> [https://travis-ci.org/apache/flink/builds/546971532]
>  
> It seems that it's not just about some specific modules, so I changed the 
> title.
>  Many Travis buildings failed this morning with the same reason.
>  I'm not sure who is familiar with Travis, cc [~StephanEwen]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12961) StreamExecutionEnvironment supports executing job with StreamGraph

2019-06-24 Thread Biao Liu (JIRA)
Biao Liu created FLINK-12961:


 Summary: StreamExecutionEnvironment supports executing job with 
StreamGraph
 Key: FLINK-12961
 URL: https://issues.apache.org/jira/browse/FLINK-12961
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Reporter: Biao Liu
Assignee: Biao Liu
 Fix For: 1.9.0


Expose an internal method {{execute(StreamGraph)}} of 
{{StreamExecutionEnvironment}}. So the pluggable runner could get a chance to 
set properties of {{StreamGraph}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12974) Bump checkstyle version to 8.14

2019-06-25 Thread Biao Liu (JIRA)
Biao Liu created FLINK-12974:


 Summary: Bump checkstyle version to 8.14
 Key: FLINK-12974
 URL: https://issues.apache.org/jira/browse/FLINK-12974
 Project: Flink
  Issue Type: Improvement
  Components: Build System
Affects Versions: 1.8.0, 1.7.2
Reporter: Biao Liu
Assignee: Biao Liu
 Fix For: 1.9.0


The descriptions of checkstyle of IDE setup document [1] is out of date. In the 
current checkstyle plugin (v5.28.0), there is no option of 8.12 in checkstyle 
setting dialog box. The version 8.12 is no longer supported, checkstyle 
suggests to upgrade to 8.14 "due to lack of breaking changes" [2].
It would confuse the user that document says "This step is important, don’t 
skip it!", but user can not find the 8.12 option.
Since there are still some modules do not support checkstyle plugin, I have 
just manually checked some modules under 8.14 version. It shows OK.
cc [~Zentol], is there a better way to check the bumping?

1. 
https://ci.apache.org/projects/flink/flink-docs-master/flinkDev/ide_setup.html#checkstyle-for-java
2. 
https://github.com/jshiell/checkstyle-idea/commit/8504b286e9e443223db0b0f3b2d82e5bc183918f#diff-1698afbd7d9abfdb701fc6e471529013



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-4399) Add support for oversized messages

2019-06-26 Thread Biao Liu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-4399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu reassigned FLINK-4399:
---

Assignee: Biao Liu

> Add support for oversized messages
> --
>
> Key: FLINK-4399
> URL: https://issues.apache.org/jira/browse/FLINK-4399
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
> Environment: FLIP-6 feature branch
>Reporter: Stephan Ewen
>Assignee: Biao Liu
>Priority: Major
>  Labels: flip-6
>
> Currently, messages larger than the maximum Akka Framesize cause an error 
> when being transported. We should add a way to pass messages that are larger 
> than the Framesize, as may happen for:
>   - {{collect()}} calls that collect large data sets (via accumulators)
>   - Job submissions and operator deployments where the functions closures are 
> large (for example because it contains large pre-loaded data)
>   - Function restore in cases where restored state is larger than 
> checkpointed state (union state)
> I suggest to use the {{BlobManager}} to transfer large payload.
>   - On the sender side, oversized messages are stored under a transient blob 
> (which is deleted after first retrieval, or after a certain number of minutes)
>   - The sender sends a "pointer to blob message" instead.
>   - The receiver grabs the message from the blob upon receiving the pointer 
> message
> The RPC Service should be optionally initializable with a "large message 
> handler" which is internally the {{BlobManager}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4399) Add support for oversized messages

2019-06-26 Thread Biao Liu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16873791#comment-16873791
 ] 

Biao Liu commented on FLINK-4399:
-

Recently at-least two users met the oversized message issue. I'll try to 
resolve this. Since this general resolution is a bit complicated, I will attach 
a design doc to have a discussion first in next days.

> Add support for oversized messages
> --
>
> Key: FLINK-4399
> URL: https://issues.apache.org/jira/browse/FLINK-4399
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
> Environment: FLIP-6 feature branch
>Reporter: Stephan Ewen
>Assignee: Biao Liu
>Priority: Major
>  Labels: flip-6
>
> Currently, messages larger than the maximum Akka Framesize cause an error 
> when being transported. We should add a way to pass messages that are larger 
> than the Framesize, as may happen for:
>   - {{collect()}} calls that collect large data sets (via accumulators)
>   - Job submissions and operator deployments where the functions closures are 
> large (for example because it contains large pre-loaded data)
>   - Function restore in cases where restored state is larger than 
> checkpointed state (union state)
> I suggest to use the {{BlobManager}} to transfer large payload.
>   - On the sender side, oversized messages are stored under a transient blob 
> (which is deleted after first retrieval, or after a certain number of minutes)
>   - The sender sends a "pointer to blob message" instead.
>   - The receiver grabs the message from the blob upon receiving the pointer 
> message
> The RPC Service should be optionally initializable with a "large message 
> handler" which is internally the {{BlobManager}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-4399) Add support for oversized messages

2019-06-26 Thread Biao Liu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16873791#comment-16873791
 ] 

Biao Liu edited comment on FLINK-4399 at 6/27/19 6:56 AM:
--

Recently at-least two users met the oversized message issue. I'll try to 
resolve this. Since this general resolution is a bit complicated, I will attach 
a design doc to have a discussion first in the next days.


was (Author: sleepy):
Recently at-least two users met the oversized message issue. I'll try to 
resolve this. Since this general resolution is a bit complicated, I will attach 
a design doc to have a discussion first in next days.

> Add support for oversized messages
> --
>
> Key: FLINK-4399
> URL: https://issues.apache.org/jira/browse/FLINK-4399
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
> Environment: FLIP-6 feature branch
>Reporter: Stephan Ewen
>Assignee: Biao Liu
>Priority: Major
>  Labels: flip-6
>
> Currently, messages larger than the maximum Akka Framesize cause an error 
> when being transported. We should add a way to pass messages that are larger 
> than the Framesize, as may happen for:
>   - {{collect()}} calls that collect large data sets (via accumulators)
>   - Job submissions and operator deployments where the functions closures are 
> large (for example because it contains large pre-loaded data)
>   - Function restore in cases where restored state is larger than 
> checkpointed state (union state)
> I suggest to use the {{BlobManager}} to transfer large payload.
>   - On the sender side, oversized messages are stored under a transient blob 
> (which is deleted after first retrieval, or after a certain number of minutes)
>   - The sender sends a "pointer to blob message" instead.
>   - The receiver grabs the message from the blob upon receiving the pointer 
> message
> The RPC Service should be optionally initializable with a "large message 
> handler" which is internally the {{BlobManager}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-13042) Make slot sharing configurable on ExecutionConfig

2019-07-01 Thread Biao Liu (JIRA)
Biao Liu created FLINK-13042:


 Summary: Make slot sharing configurable on ExecutionConfig
 Key: FLINK-13042
 URL: https://issues.apache.org/jira/browse/FLINK-13042
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Reporter: Biao Liu
Assignee: Biao Liu
 Fix For: 1.9.0


There is a requirement of Blink batch planner that providing a global setting 
that disabling slot sharing. To support that, will expose a {{PublicEvolving}} 
method on {{ExecutionConfig}} to globally disable slot sharing.

Note that, this method might be removed if there is a better approach to 
satisfy Blink batch planner in the future.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12961) StreamExecutionEnvironment supports executing job with StreamGraph

2019-07-01 Thread Biao Liu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16876086#comment-16876086
 ] 

Biao Liu commented on FLINK-12961:
--

This approach is abandoned since the we decide to use the alternative solution 
described in FLINK-13041 and FLINK-13042.

> StreamExecutionEnvironment supports executing job with StreamGraph
> --
>
> Key: FLINK-12961
> URL: https://issues.apache.org/jira/browse/FLINK-12961
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Expose an internal method {{execute(StreamGraph)}} of 
> {{StreamExecutionEnvironment}}. So the pluggable runner could get a chance to 
> set properties of {{StreamGraph}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-12961) StreamExecutionEnvironment supports executing job with StreamGraph

2019-07-01 Thread Biao Liu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12961?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu closed FLINK-12961.

Resolution: Abandoned

> StreamExecutionEnvironment supports executing job with StreamGraph
> --
>
> Key: FLINK-12961
> URL: https://issues.apache.org/jira/browse/FLINK-12961
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Expose an internal method {{execute(StreamGraph)}} of 
> {{StreamExecutionEnvironment}}. So the pluggable runner could get a chance to 
> set properties of {{StreamGraph}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-13042) Make slot sharing configurable on StreamExecutionEnvironment

2019-07-02 Thread Biao Liu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-13042:
-
Summary: Make slot sharing configurable on StreamExecutionEnvironment  
(was: Make slot sharing configurable on ExecutionConfig)

> Make slot sharing configurable on StreamExecutionEnvironment
> 
>
> Key: FLINK-13042
> URL: https://issues.apache.org/jira/browse/FLINK-13042
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
> Fix For: 1.9.0
>
>
> There is a requirement of Blink batch planner that providing a global setting 
> that disabling slot sharing. To support that, will expose a 
> {{PublicEvolving}} method on {{ExecutionConfig}} to globally disable slot 
> sharing.
> Note that, this method might be removed if there is a better approach to 
> satisfy Blink batch planner in the future.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-13042) Make slot sharing configurable on StreamExecutionEnvironment

2019-07-02 Thread Biao Liu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-13042:
-
Description: 
There is a requirement of Blink batch planner that providing a global setting 
that disabling slot sharing. To support that, will expose a {{PublicEvolving}} 
method on {{StreamExecutionEnvironment}} to globally disable slot sharing.

Note that, this method might be removed if there is a better approach to 
satisfy Blink batch planner in the future.

  was:
There is a requirement of Blink batch planner that providing a global setting 
that disabling slot sharing. To support that, will expose a {{PublicEvolving}} 
method on {{ExecutionConfig}} to globally disable slot sharing.

Note that, this method might be removed if there is a better approach to 
satisfy Blink batch planner in the future.


> Make slot sharing configurable on StreamExecutionEnvironment
> 
>
> Key: FLINK-13042
> URL: https://issues.apache.org/jira/browse/FLINK-13042
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
> Fix For: 1.9.0
>
>
> There is a requirement of Blink batch planner that providing a global setting 
> that disabling slot sharing. To support that, will expose a 
> {{PublicEvolving}} method on {{StreamExecutionEnvironment}} to globally 
> disable slot sharing.
> Note that, this method might be removed if there is a better approach to 
> satisfy Blink batch planner in the future.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-13098) Add a new type UNDEFINED of shuffle mode

2019-07-04 Thread Biao Liu (JIRA)
Biao Liu created FLINK-13098:


 Summary: Add a new type UNDEFINED of shuffle mode
 Key: FLINK-13098
 URL: https://issues.apache.org/jira/browse/FLINK-13098
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Reporter: Biao Liu
Assignee: Biao Liu
 Fix For: 1.9.0


The {{UNDEFINED}} type is the default value of shuffle mode. If there is no 
specific {{PartitionTransformation}}, the shuffle mode would be {{UNDEFINED}}.
 This new shuffle type leaves some space for optimization later. The 
optimization might be based on resources or some global settings.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-13099) Add a new type UNDEFINED of shuffle mode

2019-07-04 Thread Biao Liu (JIRA)
Biao Liu created FLINK-13099:


 Summary: Add a new type UNDEFINED of shuffle mode
 Key: FLINK-13099
 URL: https://issues.apache.org/jira/browse/FLINK-13099
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Reporter: Biao Liu
Assignee: Biao Liu
 Fix For: 1.9.0


The {{UNDEFINED}} type is the default value of shuffle mode. If there is no 
specific {{PartitionTransformation}}, the shuffle mode would be {{UNDEFINED}}.
 This new shuffle type leaves some space for optimization later. The 
optimization might be based on resources or some global settings.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-13099) Add a new type UNDEFINED of shuffle mode

2019-07-04 Thread Biao Liu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16878501#comment-16878501
 ] 

Biao Liu commented on FLINK-13099:
--

Oops! The connection seems to be something wrong for a while. This ticket is 
duplicated.

> Add a new type UNDEFINED of shuffle mode
> 
>
> Key: FLINK-13099
> URL: https://issues.apache.org/jira/browse/FLINK-13099
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
> Fix For: 1.9.0
>
>
> The {{UNDEFINED}} type is the default value of shuffle mode. If there is no 
> specific {{PartitionTransformation}}, the shuffle mode would be {{UNDEFINED}}.
>  This new shuffle type leaves some space for optimization later. The 
> optimization might be based on resources or some global settings.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-13099) Add a new type UNDEFINED of shuffle mode

2019-07-04 Thread Biao Liu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu closed FLINK-13099.

Resolution: Duplicate

> Add a new type UNDEFINED of shuffle mode
> 
>
> Key: FLINK-13099
> URL: https://issues.apache.org/jira/browse/FLINK-13099
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
> Fix For: 1.9.0
>
>
> The {{UNDEFINED}} type is the default value of shuffle mode. If there is no 
> specific {{PartitionTransformation}}, the shuffle mode would be {{UNDEFINED}}.
>  This new shuffle type leaves some space for optimization later. The 
> optimization might be based on resources or some global settings.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-13101) Introduce "blocking after chaining off" property of StreamGraph

2019-07-04 Thread Biao Liu (JIRA)
Biao Liu created FLINK-13101:


 Summary: Introduce "blocking after chaining off" property of 
StreamGraph
 Key: FLINK-13101
 URL: https://issues.apache.org/jira/browse/FLINK-13101
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Reporter: Biao Liu
Assignee: Biao Liu
 Fix For: 1.9.0


The property "blocking after chaining off" means, if there are some stream 
edges that can not be chained and the shuffle mode of edge is not specified, 
translate these edges into {{BLOCKING}} result partition type.

The reason of introducing it is to satisfy the requirement of Blink batch 
planner. Because the current scheduling strategy is a bit simple. It can not 
support some complex scenarios, like a batch job with resources limited.

To be honest, it's probably a work-around solution. However it's an internal 
implementation, we can replace it when we are able to support batch job by 
scheduling strategy.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (FLINK-12961) StreamExecutionEnvironment supports executing job with StreamGraph

2019-07-05 Thread Biao Liu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12961?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu reopened FLINK-12961:
--

We need to re-think about the proposal...

> StreamExecutionEnvironment supports executing job with StreamGraph
> --
>
> Key: FLINK-12961
> URL: https://issues.apache.org/jira/browse/FLINK-12961
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Expose an internal method {{execute(StreamGraph)}} of 
> {{StreamExecutionEnvironment}}. So the pluggable runner could get a chance to 
> set properties of {{StreamGraph}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-13101) Introduce blockingConnectionsBetweenChains property of StreamGraph

2019-07-08 Thread Biao Liu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-13101:
-
Description: 
The property blockingConnectionsBetweenChains means, if there are some stream 
edges that can not be chained and the shuffle mode of edge is not specified, 
translate these edges into {{BLOCKING}} result partition type.

The reason of introducing it is to satisfy the requirement of Blink batch 
planner. Because the current scheduling strategy is a bit simple. It can not 
support some complex scenarios, like a batch job with resources limited.

To be honest, it's probably a work-around solution. However it's an internal 
implementation, we can replace it when we are able to support batch job by 
scheduling strategy.

  was:
The property "blocking after chaining off" means, if there are some stream 
edges that can not be chained and the shuffle mode of edge is not specified, 
translate these edges into {{BLOCKING}} result partition type.

The reason of introducing it is to satisfy the requirement of Blink batch 
planner. Because the current scheduling strategy is a bit simple. It can not 
support some complex scenarios, like a batch job with resources limited.

To be honest, it's probably a work-around solution. However it's an internal 
implementation, we can replace it when we are able to support batch job by 
scheduling strategy.


> Introduce blockingConnectionsBetweenChains property of StreamGraph
> --
>
> Key: FLINK-13101
> URL: https://issues.apache.org/jira/browse/FLINK-13101
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> The property blockingConnectionsBetweenChains means, if there are some stream 
> edges that can not be chained and the shuffle mode of edge is not specified, 
> translate these edges into {{BLOCKING}} result partition type.
> The reason of introducing it is to satisfy the requirement of Blink batch 
> planner. Because the current scheduling strategy is a bit simple. It can not 
> support some complex scenarios, like a batch job with resources limited.
> To be honest, it's probably a work-around solution. However it's an internal 
> implementation, we can replace it when we are able to support batch job by 
> scheduling strategy.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-13101) Introduce blockingConnectionsBetweenChains property of StreamGraph

2019-07-08 Thread Biao Liu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-13101:
-
Summary: Introduce blockingConnectionsBetweenChains property of StreamGraph 
 (was: Introduce "blocking after chaining off" property of StreamGraph)

> Introduce blockingConnectionsBetweenChains property of StreamGraph
> --
>
> Key: FLINK-13101
> URL: https://issues.apache.org/jira/browse/FLINK-13101
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> The property "blocking after chaining off" means, if there are some stream 
> edges that can not be chained and the shuffle mode of edge is not specified, 
> translate these edges into {{BLOCKING}} result partition type.
> The reason of introducing it is to satisfy the requirement of Blink batch 
> planner. Because the current scheduling strategy is a bit simple. It can not 
> support some complex scenarios, like a batch job with resources limited.
> To be honest, it's probably a work-around solution. However it's an internal 
> implementation, we can replace it when we are able to support batch job by 
> scheduling strategy.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-13042) Make slot sharing configurable on StreamExecutionEnvironment

2019-07-09 Thread Biao Liu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu closed FLINK-13042.

Resolution: Won't Fix

For 1.9, we don't need this anymore.

> Make slot sharing configurable on StreamExecutionEnvironment
> 
>
> Key: FLINK-13042
> URL: https://issues.apache.org/jira/browse/FLINK-13042
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> There is a requirement of Blink batch planner that providing a global setting 
> that disabling slot sharing. To support that, will expose a 
> {{PublicEvolving}} method on {{StreamExecutionEnvironment}} to globally 
> disable slot sharing.
> Note that, this method might be removed if there is a better approach to 
> satisfy Blink batch planner in the future.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-12681) Provide an implementation of a thread-safe counter

2019-07-09 Thread Biao Liu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12681?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu closed FLINK-12681.

  Resolution: Won't Fix
Release Note: The PR is ignored. I guess the community think that we don't 
need provide an implementation. So just close this.

> Provide an implementation of a thread-safe counter
> --
>
> Key: FLINK-12681
> URL: https://issues.apache.org/jira/browse/FLINK-12681
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Reporter: Piyush Goyal
>Assignee: Biao Liu
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> While adding metrics to Elasticsearch connector, we noticed that the 
> SimpleCounter which seems like the main implementation of Counter interface 
> is not thread-safe. It makes it tricky/expensive to use it in a 
> multi-threaded context. 
> [~SleePy] mentioned that SimpleCounter is being used in many 
> performance-sensitive code paths, so instead of changing it directly, we can 
> provide a new thread-safe implementation of Counter.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12297) Clean the closure for OutputTags in PatternStream

2019-04-23 Thread Biao Liu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16823816#comment-16823816
 ] 

Biao Liu commented on FLINK-12297:
--

It seems that there are a lot of non-cleaned components passed, another issue, 
[FLINK-12113|https://issues.apache.org/jira/browse/FLINK-12113].

We can easily fix these issues case by case. However I'm not sure is there any 
clean way to solve these problems.

> Clean the closure for OutputTags in PatternStream
> -
>
> Key: FLINK-12297
> URL: https://issues.apache.org/jira/browse/FLINK-12297
> Project: Flink
>  Issue Type: Bug
>  Components: Library / CEP
>Affects Versions: 1.8.0
>Reporter: Dawid Wysakowicz
>Priority: Blocker
> Fix For: 1.9.0, 1.8.1
>
>
> Right now we do not invoke closure cleaner on output tags. Therefore such 
> code:
> {code}
>   @Test
>   public void testFlatSelectSerialization() throws Exception {
>   StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   DataStreamSource elements = env.fromElements(1, 2, 3);
>   OutputTag outputTag = new OutputTag("AAA") {};
>   CEP.pattern(elements, Pattern.begin("A")).flatSelect(
>   outputTag,
>   new PatternFlatTimeoutFunction() {
>   @Override
>   public void timeout(
>   Map> pattern,
>   long timeoutTimestamp,
>   Collector out) throws 
> Exception {
>   }
>   },
>   new PatternFlatSelectFunction() {
>   @Override
>   public void flatSelect(Map List> pattern, Collector out) throws Exception {
>   }
>   }
>   );
>   env.execute();
>   }
> {code}
> will fail with {{The implementation of the PatternFlatSelectAdapter is not 
> serializable. }} exception



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12297) Clean the closure for OutputTags in PatternStream

2019-04-23 Thread Biao Liu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16823870#comment-16823870
 ] 

Biao Liu commented on FLINK-12297:
--

BTW, the {{OutputTag}} is not an interface, in this case, I can't find the 
reason why extends it.
Is there any possible that the original author thought {{OutputTag}} should not 
be extended?

> Clean the closure for OutputTags in PatternStream
> -
>
> Key: FLINK-12297
> URL: https://issues.apache.org/jira/browse/FLINK-12297
> Project: Flink
>  Issue Type: Bug
>  Components: Library / CEP
>Affects Versions: 1.8.0
>Reporter: Dawid Wysakowicz
>Priority: Blocker
> Fix For: 1.9.0, 1.8.1
>
>
> Right now we do not invoke closure cleaner on output tags. Therefore such 
> code:
> {code}
>   @Test
>   public void testFlatSelectSerialization() throws Exception {
>   StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   DataStreamSource elements = env.fromElements(1, 2, 3);
>   OutputTag outputTag = new OutputTag("AAA") {};
>   CEP.pattern(elements, Pattern.begin("A")).flatSelect(
>   outputTag,
>   new PatternFlatTimeoutFunction() {
>   @Override
>   public void timeout(
>   Map> pattern,
>   long timeoutTimestamp,
>   Collector out) throws 
> Exception {
>   }
>   },
>   new PatternFlatSelectFunction() {
>   @Override
>   public void flatSelect(Map List> pattern, Collector out) throws Exception {
>   }
>   }
>   );
>   env.execute();
>   }
> {code}
> will fail with {{The implementation of the PatternFlatSelectAdapter is not 
> serializable. }} exception



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12285) Memory leak in SavepointITCase and SavepointMigrationTestBase

2019-04-23 Thread Biao Liu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16823913#comment-16823913
 ] 

Biao Liu commented on FLINK-12285:
--

Hi [~xiaogang.shi],
Thank you for reporting.
I think we should improvement this both in test case and Flink components 
sides. Test case should wait the state of job to be terminal. And Flink 
components (like {{TaskExecutor}}) should support an elegant way to exit.

> Memory leak in SavepointITCase and SavepointMigrationTestBase
> -
>
> Key: FLINK-12285
> URL: https://issues.apache.org/jira/browse/FLINK-12285
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Reporter: Xiaogang Shi
>Assignee: Biao Liu
>Priority: Major
>
> The tests in {{SavepointITCase}} and {{SavepointMigrationTestBase}} do not 
> cancel running jobs before exit. It will cause exceptions in 
> {{TaskExecutor}}s and unreleased memory segments. Succeeding tests may fail 
> due to insufficient amount of memory.
> The problem is caused by cancelling {{TaskExecutor}}s with running tasks. 
> Another issue caused by the reason can be seen in FLINK-11343. Maybe we can 
> find a more dedicated method to cancel those {{TaskExecutor}}s still having 
> running tasks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12332) Cancel running tasks if exist before shutting down TM

2019-04-25 Thread Biao Liu (JIRA)
Biao Liu created FLINK-12332:


 Summary: Cancel running tasks if exist before shutting down TM
 Key: FLINK-12332
 URL: https://issues.apache.org/jira/browse/FLINK-12332
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Reporter: Biao Liu
Assignee: Biao Liu
 Fix For: 1.9.0


This is an improvement involved through 
[FLINK-12285|https://issues.apache.org/jira/browse/FLINK-12285].
{{TM}} should cancel running tasks before shutting down itself. This elegant 
way to exit is especially useful in some test cases which might be sensitive 
about the error checking.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12332) Cancel running tasks if exist before shutting down TM

2019-04-25 Thread Biao Liu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12332?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-12332:
-
External issue URL:   (was: 
https://issues.apache.org/jira/browse/FLINK-12285)

> Cancel running tasks if exist before shutting down TM
> -
>
> Key: FLINK-12332
> URL: https://issues.apache.org/jira/browse/FLINK-12332
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Minor
> Fix For: 1.9.0
>
>
> This is an improvement involved through 
> [FLINK-12285|https://issues.apache.org/jira/browse/FLINK-12285].
> {{TM}} should cancel running tasks before shutting down itself. This elegant 
> way to exit is especially useful in some test cases which might be sensitive 
> about the error checking.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-14344) Snapshot master hook state asynchronously

2019-10-08 Thread Biao Liu (Jira)
Biao Liu created FLINK-14344:


 Summary: Snapshot master hook state asynchronously
 Key: FLINK-14344
 URL: https://issues.apache.org/jira/browse/FLINK-14344
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Checkpointing
Reporter: Biao Liu
 Fix For: 1.10.0


Currently we snapshot the master hook state synchronously. As a part of 
reworking threading model of {{CheckpointCoordinator}}, we have to make this 
non-blocking to satisfy the requirement of running in main thread.

The behavior of snapshotting master hook state should be similar to task state 
snapshotting. It should be launched after \{{PendingCheckpoint}} created. It 
could complete or fail the {{PendingCheckpoint}} like task state snapshotting. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14344) Snapshot master hook state asynchronously

2019-10-09 Thread Biao Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-14344:
-
Description: 
Currently we snapshot the master hook state synchronously. As a part of 
reworking threading model of {{CheckpointCoordinator}}, we have to make this 
non-blocking to satisfy the requirement of running in main thread.

The behavior of snapshotting master hook state is similar to task state 
snapshotting. Master state snapshotting is taken before task state 
snapshotting. Because in master hook, there might be external system 
initialization which task state snapshotting might depends on.


  was:
Currently we snapshot the master hook state synchronously. As a part of 
reworking threading model of {{CheckpointCoordinator}}, we have to make this 
non-blocking to satisfy the requirement of running in main thread.

The behavior of snapshotting master hook state should be similar to task state 
snapshotting. It should be launched after \{{PendingCheckpoint}} created. It 
could complete or fail the {{PendingCheckpoint}} like task state snapshotting. 


> Snapshot master hook state asynchronously
> -
>
> Key: FLINK-14344
> URL: https://issues.apache.org/jira/browse/FLINK-14344
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing
>Reporter: Biao Liu
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently we snapshot the master hook state synchronously. As a part of 
> reworking threading model of {{CheckpointCoordinator}}, we have to make this 
> non-blocking to satisfy the requirement of running in main thread.
> The behavior of snapshotting master hook state is similar to task state 
> snapshotting. Master state snapshotting is taken before task state 
> snapshotting. Because in master hook, there might be external system 
> initialization which task state snapshotting might depends on.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14344) Snapshot master hook state asynchronously

2019-10-09 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16947662#comment-16947662
 ] 

Biao Liu commented on FLINK-14344:
--

When I start implementing this part, I just realized there is one more thing 
need to discuss. I would like to make the semantics of 
{{MasterTriggerRestoreHook#triggerCheckpoint}} clearer.

At the moment {{MasterTriggerRestoreHook}} is not a public official interface 
(at-least from perspective of codes). I'm not sure how big the side-effect 
might be if I changed the interface or behavior.

The problem is that now the interface says (described in Java doc) both 
synchronous and asynchronous implementation are OK. This behavior makes things 
more complex. I can't figure out a proper asynchronous way to handle both of 
these two scenarios at the same time.

I was planning to execute {{MasterTriggerRestoreHook#triggerCheckpoint}} in IO 
executor directly. It's OK for the synchronous scenario. But I found there 
might be a deadlock scenario under asynchronous scenario. Currently the IO 
executor is given to master hook as an input parameter. If we execute it in IO 
executor, and the implementation of 
{{MasterTriggerRestoreHook#triggerCheckpoint}} launches an asynchronous 
operation then waits for the result for some reason. It might be deadlock if 
this asynchronous operation is scheduled in the same IO thread. The waiting 
blocks the later asynchronous operation, it can't finish.

IMO synchronous and asynchronous interfaces should be different, and be treated 
in different ways.
 # The synchronous invocation returns value directly, not a completable future. 
The method is executed in IO thread under a proper lock (could be the hook 
itself). It could be launched by {{CheckpointCoordinator}} directly. The 
implementation of {{MasterTriggerRestoreHook}} could not see the IO executor at 
all. However in this way, we break the compatibility.
 # The asynchronous invocation returns a completable future just like the 
current. The {{MasterTriggerRestoreHook#triggerCheckpoint}} method itself is 
without any IO operation. All heavy IO operations are executed in the IO 
executor which is given to master hook as an input parameter. There is also an 
advantage of this asynchronous interface. We could avoid competition on all 
methods of {{MasterTriggerRestoreHook}} (run in main thread) except the real 
asynchronous part (user must guarantee it is thread-safe or under a proper 
lock) executed in IO thread. In this way, we keep the compatibility on the 
surface. However we change the behavior somewhat. We could emphasize the change 
in Java doc and release note.

If nobody could make sure the side-effect. I could start a survey or a 
discussion in mailing list. What do you think? Any feedback is appreciated.
 cc [~sewen], [~trohrmann], [~pnowojski]

> Snapshot master hook state asynchronously
> -
>
> Key: FLINK-14344
> URL: https://issues.apache.org/jira/browse/FLINK-14344
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing
>Reporter: Biao Liu
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently we snapshot the master hook state synchronously. As a part of 
> reworking threading model of {{CheckpointCoordinator}}, we have to make this 
> non-blocking to satisfy the requirement of running in main thread.
> The behavior of snapshotting master hook state is similar to task state 
> snapshotting. Master state snapshotting is taken before task state 
> snapshotting. Because in master hook, there might be external system 
> initialization which task state snapshotting might depends on.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14344) Snapshot master hook state asynchronously

2019-10-09 Thread Biao Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-14344:
-
Description: 
Currently we snapshot the master hook state synchronously. As a part of 
reworking threading model of {{CheckpointCoordinator}}, we have to make this 
non-blocking to satisfy the requirement of running in main thread.

The behavior of snapshotting master hook state is similar to task state 
snapshotting. Master state snapshotting is taken before task state 
snapshotting. Because in master hook, there might be external system 
initialization which task state snapshotting might depend on.

  was:
Currently we snapshot the master hook state synchronously. As a part of 
reworking threading model of {{CheckpointCoordinator}}, we have to make this 
non-blocking to satisfy the requirement of running in main thread.

The behavior of snapshotting master hook state is similar to task state 
snapshotting. Master state snapshotting is taken before task state 
snapshotting. Because in master hook, there might be external system 
initialization which task state snapshotting might depends on.



> Snapshot master hook state asynchronously
> -
>
> Key: FLINK-14344
> URL: https://issues.apache.org/jira/browse/FLINK-14344
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing
>Reporter: Biao Liu
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently we snapshot the master hook state synchronously. As a part of 
> reworking threading model of {{CheckpointCoordinator}}, we have to make this 
> non-blocking to satisfy the requirement of running in main thread.
> The behavior of snapshotting master hook state is similar to task state 
> snapshotting. Master state snapshotting is taken before task state 
> snapshotting. Because in master hook, there might be external system 
> initialization which task state snapshotting might depend on.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14344) Snapshot master hook state asynchronously

2019-10-11 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16949578#comment-16949578
 ] 

Biao Liu commented on FLINK-14344:
--

Hi [~pnowojski], thanks for feedback.

Regarding to the current code, yes, you are right. It confuses me as well. 
There is a comment before the waiting part of 
{{MasterHooks#triggerMasterHooks}}. It says "in the future we want to make this 
asynchronous with futures (no pun intended)". I think it means it should be 
asynchronous, but it's just not to be done for some reason. So I guess it 
should be asynchronous by design.
{quote}I'm not sure why should we execute the hooks in the IO Executor, why not 
master thread?
{quote}
Actually at the very beginning, I think it should be executed in main thread. 
However I found there is a comment of 
{{MasterTriggerRestoreHook#triggerCheckpoint}}, "If the action by this hook 
needs to be executed synchronously, then this method should directly execute 
the action synchronously and block until it is complete". Based on this 
description, I'm afraid there might be a risk of executing a blocking operation 
in main thread. So I try to execute it in IO thread, but there is another risk 
of deadlock.

It's hard to make a right decision without a clear semantics. I have started a 
survey in both dev and user mailing list to find out how develops and users use 
this interface. There is no response yet. I guess there are not too many users 
depending on it.

If there is no opposite opinion in the next few days, I intend to treat it 
asynchronous, executing it in main thread. But the comment needs to be changed, 
removing the comment of "blocking until it is complete" and emphasizing in 
comment that it should be non-blocking.

If there are some opposite voices, and we couldn't reach an agreement. Then we 
could make it synchronous by changing the signature of method (removing the 
executor and completable future).

What do you think?

> Snapshot master hook state asynchronously
> -
>
> Key: FLINK-14344
> URL: https://issues.apache.org/jira/browse/FLINK-14344
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently we snapshot the master hook state synchronously. As a part of 
> reworking threading model of {{CheckpointCoordinator}}, we have to make this 
> non-blocking to satisfy the requirement of running in main thread.
> The behavior of snapshotting master hook state is similar to task state 
> snapshotting. Master state snapshotting is taken before task state 
> snapshotting. Because in master hook, there might be external system 
> initialization which task state snapshotting might depend on.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14344) Snapshot master hook state asynchronously

2019-10-14 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16950976#comment-16950976
 ] 

Biao Liu commented on FLINK-14344:
--

Hi [~pnowojski],
{quote}I think ideally I would prefer to make a contract that sync master hooks 
should be non blocking executed in the main thread. Async hooks could also be 
executed by the main thread and user should take care of spawning/re-using his 
own thread to actually execute the async work (just as in AsyncWaitOperator). 
If user executes blocking code, let him shoot himself in the foot...{quote}
It make sense to me. So there are three options now. 
1. {{MasterState syncSnapshotHook(...)}}
2. {{CompletableFuture asyncSnapshotHook(ioExecutor)}}
3. {{CompletableFuture asyncSnapshotHook()}}

The only difference between option 2 and 3 is whether we provide an IO executor 
or not. I tend to choose option 2. But option 3 is also acceptable to me.
{quote}unless we schedule periodic actions always in some separate thread, 
first think we do is to execute the hooks in that thread, and only after 
execute the hooks, we enqueue follow up work in the main thread?{quote}
I think we should schedule periodic actions in main thread as planned. That's 
one of the biggest targets of this reworking, making it single-threaded and 
lock free(the trigger/coordinator-wide lock).
{quote}...we would have to make sure that none of the CheckpointCoordinator 
actions will be triggered until that other thread finish its work.{quote}
I think we have to face this problem. The master hook is by designed to do some 
IO operation. And we can't wait for the result synchronously. It should be done 
with {{CompletableFuture}} and main thread executor. 

> Snapshot master hook state asynchronously
> -
>
> Key: FLINK-14344
> URL: https://issues.apache.org/jira/browse/FLINK-14344
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently we snapshot the master hook state synchronously. As a part of 
> reworking threading model of {{CheckpointCoordinator}}, we have to make this 
> non-blocking to satisfy the requirement of running in main thread.
> The behavior of snapshotting master hook state is similar to task state 
> snapshotting. Master state snapshotting is taken before task state 
> snapshotting. Because in master hook, there might be external system 
> initialization which task state snapshotting might depend on.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14344) A preparation for snapshotting master hook state asynchronously

2019-11-13 Thread Biao Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-14344:
-
Summary: A preparation for snapshotting master hook state asynchronously  
(was: Snapshot master hook state asynchronously)

> A preparation for snapshotting master hook state asynchronously
> ---
>
> Key: FLINK-14344
> URL: https://issues.apache.org/jira/browse/FLINK-14344
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Currently we snapshot the master hook state synchronously. As a part of 
> reworking threading model of {{CheckpointCoordinator}}, we have to make this 
> non-blocking to satisfy the requirement of running in main thread.
> The behavior of snapshotting master hook state is similar to task state 
> snapshotting. Master state snapshotting is taken before task state 
> snapshotting. Because in master hook, there might be external system 
> initialization which task state snapshotting might depend on.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14344) A preparation for snapshotting master hook state asynchronously

2019-11-13 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16973128#comment-16973128
 ] 

Biao Liu commented on FLINK-14344:
--

[~pnowojski], makes sense. I have updated the title. The remaining part of 
"snapshot master hook state asynchronously" would be included in FLINK-13905.

> A preparation for snapshotting master hook state asynchronously
> ---
>
> Key: FLINK-14344
> URL: https://issues.apache.org/jira/browse/FLINK-14344
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Currently we snapshot the master hook state synchronously. As a part of 
> reworking threading model of {{CheckpointCoordinator}}, we have to make this 
> non-blocking to satisfy the requirement of running in main thread.
> The behavior of snapshotting master hook state is similar to task state 
> snapshotting. Master state snapshotting is taken before task state 
> snapshotting. Because in master hook, there might be external system 
> initialization which task state snapshotting might depend on.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14950) Support getKey in WindowOperator.Context

2019-11-26 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16983131#comment-16983131
 ] 

Biao Liu commented on FLINK-14950:
--

Hi Jiayi,

It's an interesting ticket. Could you provide more details of your scenario?
For example,
1. Why do you need the key from context?
2. Could it be satisfied by extracting key field from element stored in window 
state?

> Support getKey in WindowOperator.Context
> 
>
> Key: FLINK-14950
> URL: https://issues.apache.org/jira/browse/FLINK-14950
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Affects Versions: 1.9.1
>Reporter: Jiayi Liao
>Priority: Major
>
> In our scenario, user needs to access the key of {{WindowOperator.Context}} 
> to determine how to deal with the window.
> I think it's reasonable to support {{getKey()}} method in 
> {{WindowOperator.Context}}. 
> cc [~aljoscha]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14971) Move ACK and declined message handling in the same thread with triggering

2019-11-27 Thread Biao Liu (Jira)
Biao Liu created FLINK-14971:


 Summary: Move ACK and declined message handling in the same thread 
with triggering
 Key: FLINK-14971
 URL: https://issues.apache.org/jira/browse/FLINK-14971
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Checkpointing
Reporter: Biao Liu
 Fix For: 1.10.0


Currently the ACK and declined message handling are executed in IO thread. It 
should be moved into main thread eventually.
After this step, all operations could be executed in main thread. Also we don't 
need coordinator-wide lock anymore then.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14950) Support getKey in WindowOperator.Context

2019-11-29 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16984853#comment-16984853
 ] 

Biao Liu commented on FLINK-14950:
--

Hi [~wind_ljy], thanks for explanation.

I'm not sure I have fully understood your requirement. I guess there are a lot 
of other details in your scenario. Anyway regardless others, it seems that you 
want a periodic checking for each key (chatting room) in window operator. I'm 
wondering it's general enough. For example, what if someone else wants a 
periodic checking based on other fields (not the key)? It's a bit customized 
from my perspective.

> Support getKey in WindowOperator.Context
> 
>
> Key: FLINK-14950
> URL: https://issues.apache.org/jira/browse/FLINK-14950
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Affects Versions: 1.9.1
>Reporter: Jiayi Liao
>Priority: Major
>
> In our scenario, user needs to access the key of {{WindowOperator.Context}} 
> to determine how to deal with the window.
> I think it's reasonable to support {{getKey()}} method in 
> {{WindowOperator.Context}}. 
> cc [~aljoscha]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14950) Support getKey in WindowOperator.Context

2019-11-29 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16984922#comment-16984922
 ] 

Biao Liu commented on FLINK-14950:
--

[~wind_ljy],

{quote}By the way, Since the key access is not allowed we have to call 
context.toString() and analyze the pattern to get the binding key.
{quote}
Sorry to here that :(

I'm not saying it's a bad idea to make the key accessible in window context. 
It's just, you know, I didn't see a strong reason to do so. Your case seems to 
be quite complicated. I still have not fully understood your scenario. Even if 
you could get the key from context, how do you remove state from all windows 
associate with the key? The trigger is for window, not for key.

{quote}Maybe we should wait for more responses from others.
{quote}
Yeh, that would be great if someone else could give us a better idea.

> Support getKey in WindowOperator.Context
> 
>
> Key: FLINK-14950
> URL: https://issues.apache.org/jira/browse/FLINK-14950
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Affects Versions: 1.9.1
>Reporter: Jiayi Liao
>Priority: Major
>
> In our scenario, user needs to access the key of {{WindowOperator.Context}} 
> to determine how to deal with the window.
> I think it's reasonable to support {{getKey()}} method in 
> {{WindowOperator.Context}}. 
> cc [~aljoscha]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-12095) Canceling vs cancelling

2019-04-03 Thread Biao Liu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16808473#comment-16808473
 ] 

Biao Liu commented on FLINK-12095:
--

Wow! impressive report (y)

> Canceling vs cancelling
> ---
>
> Key: FLINK-12095
> URL: https://issues.apache.org/jira/browse/FLINK-12095
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Mike Pedersen
>Priority: Minor
>
> British and American English is mixed in some places, leading to confusion 
> between "canceled" and "cancelled", and "canceling" and "cancelling".
> It is called "canceling" in the ExecutionState enum: 
> https://github.com/apache/flink/blob/8ed85fe49b7595546a8f968e0faa1fa7d4da47ec/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java
> But for example here, it is incorrectly called "cancelling": 
> https://ci.apache.org/projects/flink/flink-docs-stable/internals/job_scheduling.html
> Likewise in the state diagram of the ExecutionState itself: 
> https://github.com/apache/flink/blob/8ed85fe49b7595546a8f968e0faa1fa7d4da47ec/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java#L31
> Same for the /jobs REST schema. It is unclear if this is due to incorrect 
> documentation or a misspelling in the JSON output i.e. if the schema is wrong 
> or correct, but misspelled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   >