[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat

2019-09-11 Thread GitBox
pnowojski commented on a change in pull request #9564: 
[FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox 
for timer triggers, checkpoints and AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9564#discussion_r323523245
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ##
 @@ -428,6 +429,10 @@ public final void invoke() throws Exception {
// See FLINK-7430
isRunning = false;
}
+   MailboxExecutor mainMailboxExecutor = 
mailboxProcessor.getMainMailboxExecutor();
 
 Review comment:
   Can we add some unit test in `StreamTaskTest`? Relaying on a different 
module/package/class to provide test coverage this will be more fragile in the 
future.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat

2019-09-09 Thread GitBox
pnowojski commented on a change in pull request #9564: 
[FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox 
for timer triggers, checkpoints and AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9564#discussion_r322080057
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 ##
 @@ -1380,6 +1398,54 @@ protected void 
performDefaultAction(DefaultActionContext context) throws Excepti
}
}
 
+   /**
+* A task that checks that some StreamTask methods are called only in 
the main task's thread.
 
 Review comment:
   I would rephrase this comment and move it above `testThreadInvariants`. 
Otherwise it's quite difficult to understand what this test is suppose to do.
   
   Also could you copy paste this also as a commit message?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat

2019-09-09 Thread GitBox
pnowojski commented on a change in pull request #9564: 
[FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox 
for timer triggers, checkpoints and AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9564#discussion_r322087304
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
 ##
 @@ -261,10 +261,9 @@ public void abortCheckpointOnBarrier(long checkpointId, 
Throwable cause) throws
 * Invoked when a checkpoint has been completed, i.e., when the 
checkpoint coordinator has received
 
 Review comment:
   @1u0 ? (You have resolved the comment, but haven't responded and haven't 
updated the commit message) 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat

2019-09-09 Thread GitBox
pnowojski commented on a change in pull request #9564: 
[FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox 
for timer triggers, checkpoints and AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9564#discussion_r322079010
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ##
 @@ -428,6 +429,10 @@ public final void invoke() throws Exception {
// See FLINK-7430
isRunning = false;
}
+   MailboxExecutor mainMailboxExecutor = 
mailboxProcessor.getMainMailboxExecutor();
 
 Review comment:
   Was this addressed @1u0?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat

2019-09-09 Thread GitBox
pnowojski commented on a change in pull request #9564: 
[FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox 
for timer triggers, checkpoints and AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9564#discussion_r322111247
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java
 ##
 @@ -80,9 +88,14 @@ public void run() {
try {
while (running) {
LOG.debug("Wait for next completed async stream 
element result.");
-   AsyncResult streamElementEntry = 
streamElementQueue.peekBlockingly();
-
-   output(streamElementEntry);
+   AsyncResult asyncResult = 
streamElementQueue.peekBlockingly();
+   executor.submit(() -> {
+   try {
+   output(asyncResult);
+   } catch (InterruptedException e) {
 
 Review comment:
   Why do we need this `InterruptedException`? I see that it's coming (and it 
is the same in master branch) from `streamElementQueue.poll()`, however I don't 
understand it:
   1. We shouldn't be blocking neither in the mailbox thread (this version) nor 
under the checkpoint lock (master version), so this already looks suspicious.
   2. `poll` method that blocks is already "mildly" surprising contract on it's 
own
   3. From the code it looks like this `poll` call should never block and 
changing it to a proper poll (returning `Optional` or `null`) and adding a 
check state that the element is present, would work just as well but would be 
better/cleaner.
   
   @AHeise I remember that you were also rising some objections about the 
`Emitter` code and that it could be simplified and you wanted to work on that 
later. Did you have this in mind as well?
   
   Am I missing something? If not can you @AHeise create a JIRA ticket for 
cleaning up those things?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat

2019-09-02 Thread GitBox
pnowojski commented on a change in pull request #9564: 
[FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox 
for timer triggers, checkpoints and AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9564#discussion_r319896679
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
 ##
 @@ -1139,17 +1138,9 @@ public void triggerCheckpointBarrier(
 
if (executionState == ExecutionState.RUNNING && invokable != 
null) {
 
-   // build a local closure
-   final SafetyNetCloseableRegistry 
safetyNetCloseableRegistry =
-   
FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread();
-
Runnable runnable = new Runnable() {
@Override
public void run() {
-   // set safety net from the task's 
context for checkpointing thread
 
 Review comment:
   To me it looks like 
`FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread` should be called 
by any thread that will be executing some/any part of the code that used to be 
invoked in `invokable.triggerCheckpoint`. This is the entry point, where safety 
net registry was being set, and then it was being used implicitly via static 
variables in `SnapshotDirectory` via `FileSystem#getLocalFileSystem`. 
   
   > we are not calling any user code and file system operations in the 
dispatcher thread anymore.
   
   I guess yes, probably setting it here now doesn't make sense, but it 
shouldn't be just removed from here, but moved used inside the mailbox action? 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat

2019-09-02 Thread GitBox
pnowojski commented on a change in pull request #9564: 
[FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox 
for timer triggers, checkpoints and AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9564#discussion_r319833048
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
 ##
 @@ -1139,17 +1138,9 @@ public void triggerCheckpointBarrier(
 
if (executionState == ExecutionState.RUNNING && invokable != 
null) {
 
-   // build a local closure
-   final SafetyNetCloseableRegistry 
safetyNetCloseableRegistry =
-   
FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread();
-
Runnable runnable = new Runnable() {
@Override
public void run() {
-   // set safety net from the task's 
context for checkpointing thread
 
 Review comment:
   > Do you have any reasons to keep it?
   
   I don't know this code and I'm not sure how it's suppose to work, but I see 
that it's being used 
(`FileSystemSafetyNet#initializeSafetyNetForThread/closeSafetyNetAndGuardedResourcesForThread/wrapWithSafetyNetWhenActivated`).
 Since you were removing it, I thought that you have investigated how is it 
suppose to be working and that's safe to remove it (as I'm pretty sure that 
such safety feature has a very poor test coverage).
   
   > For the second part of your comment, I don't think it would be correct to 
allow any other (non main task thread) to get letters from the mailbox. And so 
far, we haven't raised any reason to have such requirement (that letters can be 
run by some other threads).
   
   I guess that's right as long as we do not allow for yielding in the legacy 
source threads. Do we have a `checkstate` somewhere for that? Somewhere Inside 
yield a `checkState(isMailboxThread())`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat

2019-09-02 Thread GitBox
pnowojski commented on a change in pull request #9564: 
[FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox 
for timer triggers, checkpoints and AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9564#discussion_r319830496
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
 ##
 @@ -1151,7 +1151,10 @@ public void run() {

FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);
 
try {
-   
invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions, 
advanceToEndOfEventTime);
+   
invokable.triggerCheckpointAsync(checkpointMetaData, checkpointOptions, 
advanceToEndOfEventTime);
 
 Review comment:
   I would guess that there is only one RPC thread, however I do not see an 
issue here even if they weren't? Those actions would be ordered on the mailbox, 
the same way they were ordered on the `asyncCallDispatcher`. If there was a 
race on the RPC thread between requests, there would be the same situation now, 
just against enqueuing into a different single threaded executor.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat

2019-09-02 Thread GitBox
pnowojski commented on a change in pull request #9564: 
[FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox 
for timer triggers, checkpoints and AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9564#discussion_r319828976
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorFactory.java
 ##
 @@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.YieldingOperatorFactory;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import 
org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxExecutor;
+
+/**
+ * The factory of {@link AsyncWaitOperator}.
+ *
+ * @param  The output type of the operator
+ */
+public class AsyncWaitOperatorFactory implements 
OneInputStreamOperatorFactory, YieldingOperatorFactory {
+   private final AsyncFunction asyncFunction;
+   private final long timeout;
+   private final int capacity;
+   private final AsyncDataStream.OutputMode outputMode;
+   private MailboxExecutor mailboxExecutor;
+   private ChainingStrategy strategy = ChainingStrategy.HEAD;
+
+   public AsyncWaitOperatorFactory(
+   AsyncFunction asyncFunction,
+   long timeout,
+   int capacity,
+   AsyncDataStream.OutputMode outputMode) {
+   this.asyncFunction = asyncFunction;
+   this.timeout = timeout;
+   this.capacity = capacity;
+   this.outputMode = outputMode;
+   }
+
+   @Override public void setMailboxExecutor(MailboxExecutor 
mailboxExecutor) {
+   this.mailboxExecutor = mailboxExecutor;
+   }
+
+   @Override public StreamOperator createStreamOperator(StreamTask 
containingTask, StreamConfig config,
+   Output output) {
+   AsyncWaitOperator asyncWaitOperator = new AsyncWaitOperator(
+   asyncFunction,
+   timeout,
+   capacity,
+   outputMode,
+   mailboxExecutor);
+   asyncWaitOperator.setup(containingTask, config, output);
 
 Review comment:
   Yes, I meant AsyncWaitOperator :) I copied/pated wrong class name.
   
   > I can create a follow up ticket for that 
   
   Please do so and can you CC me in the ticket? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat

2019-08-30 Thread GitBox
pnowojski commented on a change in pull request #9564: 
[FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox 
for timer triggers, checkpoints and AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9564#discussion_r319468648
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ##
 @@ -380,9 +380,6 @@ public final void invoke() throws Exception {
//  Invoke 
 
 Review comment:
   Can we re-order the commits, that all of the refactors/preparations commits 
like:
   - open mailbox early
   - drain mailbox
   - "Move decline checkpoint handling into StreamTask" 
   - ...
   
   Come before the `AsyncWaitOperator` and `Timers`? In that case we would have 
only a sequence of 3 hard linked commits (AsyncWaitOperator, timers, 
checkpoints), instead of 8? 9? What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat

2019-08-30 Thread GitBox
pnowojski commented on a change in pull request #9564: 
[FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox 
for timer triggers, checkpoints and AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9564#discussion_r319473779
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
 ##
 @@ -1151,7 +1151,10 @@ public void run() {

FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);
 
try {
-   
invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions, 
advanceToEndOfEventTime);
+   
invokable.triggerCheckpointAsync(checkpointMetaData, checkpointOptions, 
advanceToEndOfEventTime);
 
 Review comment:
   also hmmm, this is a bit strange pattern. We enqueue some action in a single 
threaded `asyncCallDispatcher`, which has mostly only one thing to do: enqueue 
something in the mailbox. Can not we simply drop `executeAsyncCallRunnable` and 
`asyncCallDispatcher` and go directly to the mailbox?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat

2019-08-30 Thread GitBox
pnowojski commented on a change in pull request #9564: 
[FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox 
for timer triggers, checkpoints and AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9564#discussion_r319471388
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
 ##
 @@ -1151,7 +1151,10 @@ public void run() {

FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);
 
try {
-   
invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions, 
advanceToEndOfEventTime);
+   
invokable.triggerCheckpointAsync(checkpointMetaData, checkpointOptions, 
advanceToEndOfEventTime);
+   }
+   catch (RejectedExecutionException ex) {
+   // This may happen if the 
mailbox is closed. It means that the task is shutting down, so we just ignore 
it.
 
 Review comment:
   should we log this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat

2019-08-30 Thread GitBox
pnowojski commented on a change in pull request #9564: 
[FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox 
for timer triggers, checkpoints and AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9564#discussion_r319453203
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
 ##
 @@ -305,14 +305,8 @@ public void testWaterMarkUnordered() throws Exception {
}
 
private void testEventTime(AsyncDataStream.OutputMode mode) throws 
Exception {
-   final AsyncWaitOperator operator = new 
AsyncWaitOperator<>(
-   new MyAsyncFunction(),
-   TIMEOUT,
-   2,
-   mode);
-
-   final OneInputStreamOperatorTestHarness 
testHarness =
-   new 
OneInputStreamOperatorTestHarness<>(operator, IntSerializer.INSTANCE);
+   final OneInputStreamOperatorTestHarness 
testHarness = createTestHarness(
+   new MyAsyncFunction(), TIMEOUT, 2, mode);
 
 Review comment:
   nit: can you restore previous formatting?
   
   
https://flink.apache.org/contributing/code-style-and-quality-formatting.html#breaking-the-lines-of-too-long-statements
   
   > If you break the line then each argument/call should have a separate line, 
including the first one


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat

2019-08-30 Thread GitBox
pnowojski commented on a change in pull request #9564: 
[FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox 
for timer triggers, checkpoints and AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9564#discussion_r319478576
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
 ##
 @@ -261,10 +261,9 @@ public void abortCheckpointOnBarrier(long checkpointId, 
Throwable cause) throws
 * Invoked when a checkpoint has been completed, i.e., when the 
checkpoint coordinator has received
 
 Review comment:
   > [FLINK-12482][runtime] Move CheckpointListener.notifyCheckpointComplete 
call into StreamTask
   
   Why did we need this change? Can you explain the reason behind this commit 
inside the commit message?
   
   Maybe it should be squashed with the following commit?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat

2019-08-30 Thread GitBox
pnowojski commented on a change in pull request #9564: 
[FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox 
for timer triggers, checkpoints and AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9564#discussion_r319467875
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ##
 @@ -428,6 +429,10 @@ public final void invoke() throws Exception {
// See FLINK-7430
isRunning = false;
}
+   MailboxExecutor mainMailboxExecutor = 
mailboxProcessor.getMainMailboxExecutor();
 
 Review comment:
   +1
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat

2019-08-30 Thread GitBox
pnowojski commented on a change in pull request #9564: 
[FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox 
for timer triggers, checkpoints and AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9564#discussion_r319465635
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ##
 @@ -64,6 +64,7 @@
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 
 Review comment:
   +1


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat

2019-08-30 Thread GitBox
pnowojski commented on a change in pull request #9564: 
[FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox 
for timer triggers, checkpoints and AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9564#discussion_r319463486
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorFactory.java
 ##
 @@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.YieldingOperatorFactory;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import 
org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxExecutor;
+
+/**
+ * The factory of {@link AsyncWaitOperator}.
+ *
+ * @param  The output type of the operator
+ */
+public class AsyncWaitOperatorFactory implements 
OneInputStreamOperatorFactory, YieldingOperatorFactory {
+   private final AsyncFunction asyncFunction;
+   private final long timeout;
+   private final int capacity;
+   private final AsyncDataStream.OutputMode outputMode;
+   private MailboxExecutor mailboxExecutor;
+   private ChainingStrategy strategy = ChainingStrategy.HEAD;
+
+   public AsyncWaitOperatorFactory(
+   AsyncFunction asyncFunction,
+   long timeout,
+   int capacity,
+   AsyncDataStream.OutputMode outputMode) {
+   this.asyncFunction = asyncFunction;
+   this.timeout = timeout;
+   this.capacity = capacity;
+   this.outputMode = outputMode;
+   }
+
+   @Override public void setMailboxExecutor(MailboxExecutor 
mailboxExecutor) {
 
 Review comment:
   nit: `@Override` in a separate line?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat

2019-08-30 Thread GitBox
pnowojski commented on a change in pull request #9564: 
[FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox 
for timer triggers, checkpoints and AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9564#discussion_r319479865
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
 ##
 @@ -1183,8 +1183,12 @@ public void notifyCheckpointComplete(final long 
checkpointID) {
@Override
public void run() {
try {
-   
invokable.notifyCheckpointComplete(checkpointID);
-   } catch (Throwable t) {
+   
invokable.notifyCheckpointCompleteAsync(checkpointID);
 
 Review comment:
   ditto about the double "mailbox" pattern?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat

2019-08-30 Thread GitBox
pnowojski commented on a change in pull request #9564: 
[FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox 
for timer triggers, checkpoints and AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9564#discussion_r319463940
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorFactory.java
 ##
 @@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.YieldingOperatorFactory;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import 
org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxExecutor;
+
+/**
+ * The factory of {@link AsyncWaitOperator}.
+ *
+ * @param  The output type of the operator
+ */
+public class AsyncWaitOperatorFactory implements 
OneInputStreamOperatorFactory, YieldingOperatorFactory {
+   private final AsyncFunction asyncFunction;
+   private final long timeout;
+   private final int capacity;
+   private final AsyncDataStream.OutputMode outputMode;
+   private MailboxExecutor mailboxExecutor;
+   private ChainingStrategy strategy = ChainingStrategy.HEAD;
+
+   public AsyncWaitOperatorFactory(
+   AsyncFunction asyncFunction,
+   long timeout,
+   int capacity,
+   AsyncDataStream.OutputMode outputMode) {
+   this.asyncFunction = asyncFunction;
+   this.timeout = timeout;
+   this.capacity = capacity;
+   this.outputMode = outputMode;
+   }
+
+   @Override public void setMailboxExecutor(MailboxExecutor 
mailboxExecutor) {
+   this.mailboxExecutor = mailboxExecutor;
+   }
+
+   @Override public StreamOperator createStreamOperator(StreamTask 
containingTask, StreamConfig config,
+   Output output) {
+   AsyncWaitOperator asyncWaitOperator = new AsyncWaitOperator(
+   asyncFunction,
+   timeout,
+   capacity,
+   outputMode,
+   mailboxExecutor);
+   asyncWaitOperator.setup(containingTask, config, output);
 
 Review comment:
   setup method is no longer needed with `AsyncWaitOperatorFactory` and whole 
`AsyncWaitOperatorFactory` could be cleaned up: all fields could be made non 
`transient` and `final`.
   
   If it's too big clean up to be done within this PR, can you create a ticket 
to do this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat

2019-08-30 Thread GitBox
pnowojski commented on a change in pull request #9564: 
[FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox 
for timer triggers, checkpoints and AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9564#discussion_r319464910
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
 ##
 @@ -50,29 +50,21 @@
 
// 

 
-   /** The containing task that owns this time service provider. */
-   private final AsyncExceptionHandler task;
-
-   /** The lock that timers acquire upon triggering. */
-   private final Object checkpointLock;
-
/** The executor service that schedules and calls the triggers of this 
task. */
private final ScheduledThreadPoolExecutor timerService;
 
+   private final ScheduledCallbackExecutionContext 
callbackExecutionContext;
private final AtomicInteger status;
 
-   public SystemProcessingTimeService(AsyncExceptionHandler 
failureHandler, Object checkpointLock) {
-   this(failureHandler, checkpointLock, null);
+   @VisibleForTesting
+   SystemProcessingTimeService(ScheduledCallbackExecutionContext 
callbackExecutionContext) {
+   this(callbackExecutionContext, null);
}
 
public SystemProcessingTimeService(
-   AsyncExceptionHandler task,
-   Object checkpointLock,
-   ThreadFactory threadFactory) {
-
-   this.task = checkNotNull(task);
-   this.checkpointLock = checkNotNull(checkpointLock);
+   ScheduledCallbackExecutionContext callbackExecutionContext, 
ThreadFactory threadFactory) {
 
 Review comment:
   nit: ditto about wrapped parameters in one line.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat

2019-08-30 Thread GitBox
pnowojski commented on a change in pull request #9564: 
[FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox 
for timer triggers, checkpoints and AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9564#discussion_r319476565
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
 ##
 @@ -1139,17 +1138,9 @@ public void triggerCheckpointBarrier(
 
if (executionState == ExecutionState.RUNNING && invokable != 
null) {
 
-   // build a local closure
-   final SafetyNetCloseableRegistry 
safetyNetCloseableRegistry =
-   
FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread();
-
Runnable runnable = new Runnable() {
@Override
public void run() {
-   // set safety net from the task's 
context for checkpointing thread
 
 Review comment:
   Are you sure we can remove this? What was the purpose of this safety net? 
   
   Actually, shouldn't we set it inside the mailbox action? I guess this is set 
correctly for the main Task's thread, but this `triggerCheckpointAsync` code 
can be executed from a `LegacySourceThread` as well? (because of yielding 
execution)?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat

2019-08-30 Thread GitBox
pnowojski commented on a change in pull request #9564: 
[FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox 
for timer triggers, checkpoints and AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9564#discussion_r319470555
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
 ##
 @@ -219,9 +220,12 @@ public ExecutionConfig getExecutionConfig() {
 * @param advanceToEndOfEventTime Flag indicating if the source should 
inject a {@code MAX_WATERMARK} in the pipeline
 *  to fire any registered event-time timers
 *
-* @return {@code false} if the checkpoint can not be carried out, 
{@code true} otherwise
+* @return future with value of {@code false} if the checkpoint was not 
carried out, {@code true} otherwise
 */
-   public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, 
CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) throws 
Exception {
+   public Future triggerCheckpointAsync(
+   CheckpointMetaData checkpointMetaData,
+   CheckpointOptions checkpointOptions,
+   boolean advanceToEndOfEventTime) {
 
 Review comment:
   nit: double indent the parameters or put a new line after `{`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat

2019-08-30 Thread GitBox
pnowojski commented on a change in pull request #9564: 
[FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox 
for timer triggers, checkpoints and AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9564#discussion_r319461753
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
 ##
 @@ -1127,4 +1073,12 @@ public void testEndInput() throws Exception {
}
}
}
+
+   private static  OneInputStreamOperatorTestHarness 
createTestHarness(
+   AsyncFunction function, long timeout, int 
capacity, AsyncDataStream.OutputMode outputMode) throws Exception {
 
 Review comment:
   nit: wrap the parameters?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat

2019-08-30 Thread GitBox
pnowojski commented on a change in pull request #9564: 
[FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox 
for timer triggers, checkpoints and AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9564#discussion_r319464273
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/EmitterTest.java
 ##
 @@ -194,4 +205,22 @@ public void testEmitterWithExceptions() throws Exception {
emitterThread.interrupt();
}
}
+
+   private static class MockExecutor implements MailboxExecutor {
+   @Override public void execute(@Nonnull Runnable command) throws 
RejectedExecutionException {
 
 Review comment:
   `@Override` in a separate line?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat

2019-08-30 Thread GitBox
pnowojski commented on a change in pull request #9564: 
[FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox 
for timer triggers, checkpoints and AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9564#discussion_r319468278
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ##
 @@ -428,6 +429,10 @@ public final void invoke() throws Exception {
// See FLINK-7430
isRunning = false;
}
+   MailboxExecutor mainMailboxExecutor = 
mailboxProcessor.getMainMailboxExecutor();
 
 Review comment:
   Also we should have a dedicated unit test for that (unless it's too 
complicated to setup, but it would be better not to relay on ITCase's for this 
feature).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat

2019-08-30 Thread GitBox
pnowojski commented on a change in pull request #9564: 
[FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox 
for timer triggers, checkpoints and AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9564#discussion_r319472433
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
 ##
 @@ -1151,7 +1151,10 @@ public void run() {

FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);
 
try {
-   
invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions, 
advanceToEndOfEventTime);
+   
invokable.triggerCheckpointAsync(checkpointMetaData, checkpointOptions, 
advanceToEndOfEventTime);
 
 Review comment:
   missing `.get()`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat

2019-08-30 Thread GitBox
pnowojski commented on a change in pull request #9564: 
[FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox 
for timer triggers, checkpoints and AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9564#discussion_r319480244
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
 ##
 @@ -1183,8 +1183,12 @@ public void notifyCheckpointComplete(final long 
checkpointID) {
@Override
public void run() {
try {
-   
invokable.notifyCheckpointComplete(checkpointID);
-   } catch (Throwable t) {
+   
invokable.notifyCheckpointCompleteAsync(checkpointID);
 
 Review comment:
   if we do not wait, then we will miss the error handling? 
   
   But please first respond to my comment about double/nested mailbox/enqueuing 
pattern.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat

2019-08-30 Thread GitBox
pnowojski commented on a change in pull request #9564: 
[FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox 
for timer triggers, checkpoints and AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9564#discussion_r319467301
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ##
 @@ -1368,13 +1369,19 @@ private void 
checkpointStreamOperator(StreamOperator op) throws Exception {
 
private class TimerInvocationContext implements 
SystemProcessingTimeService.ScheduledCallbackExecutionContext {
@Override
-   public void invoke(ProcessingTimeCallback callback, long 
timestamp) {
-   synchronized (getCheckpointLock()) {
-   try {
-   callback.onProcessingTime(timestamp);
-   } catch (Throwable t) {
-   handleAsyncException("Caught exception 
while processing timer.", new TimerException(t));
-   }
+   public void invoke(ProcessingTimeCallback callback, long 
timestamp) throws InterruptedException {
+   try {
+   
mailboxProcessor.getMailboxExecutor(TaskMailbox.MAX_PRIORITY).execute(() -> {
+   synchronized (getCheckpointLock()) {
+   try {
+   
callback.onProcessingTime(timestamp);
+   } catch (Throwable t) {
+   
handleAsyncException("Caught exception while processing timer.", new 
TimerException(t));
+   }
+   }
+   });
+   } catch (Throwable t) {
 
 Review comment:
   Unfortunately not, this is I think the upper most layer in the stack trace. 
If we do not catch an exception here, it would be silently ignored. This made 
the deadlock that I was debugging last week non obvious to find.
   
   However maybe this deserves some comment here explaining this construct? Like
   
   ```
   // Inner try catch handles all errors during the execution of the action in 
the mailbox. Outer try catch handles errors that could happen during enqueuing 
the action.
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services