[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat
pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperator URL: https://github.com/apache/flink/pull/9564#discussion_r323523245 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ## @@ -428,6 +429,10 @@ public final void invoke() throws Exception { // See FLINK-7430 isRunning = false; } + MailboxExecutor mainMailboxExecutor = mailboxProcessor.getMainMailboxExecutor(); Review comment: Can we add some unit test in `StreamTaskTest`? Relaying on a different module/package/class to provide test coverage this will be more fragile in the future. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat
pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperator URL: https://github.com/apache/flink/pull/9564#discussion_r322080057 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ## @@ -1380,6 +1398,54 @@ protected void performDefaultAction(DefaultActionContext context) throws Excepti } } + /** +* A task that checks that some StreamTask methods are called only in the main task's thread. Review comment: I would rephrase this comment and move it above `testThreadInvariants`. Otherwise it's quite difficult to understand what this test is suppose to do. Also could you copy paste this also as a commit message? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat
pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperator URL: https://github.com/apache/flink/pull/9564#discussion_r322087304 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java ## @@ -261,10 +261,9 @@ public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) throws * Invoked when a checkpoint has been completed, i.e., when the checkpoint coordinator has received Review comment: @1u0 ? (You have resolved the comment, but haven't responded and haven't updated the commit message) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat
pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperator URL: https://github.com/apache/flink/pull/9564#discussion_r322079010 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ## @@ -428,6 +429,10 @@ public final void invoke() throws Exception { // See FLINK-7430 isRunning = false; } + MailboxExecutor mainMailboxExecutor = mailboxProcessor.getMainMailboxExecutor(); Review comment: Was this addressed @1u0? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat
pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperator URL: https://github.com/apache/flink/pull/9564#discussion_r322111247 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java ## @@ -80,9 +88,14 @@ public void run() { try { while (running) { LOG.debug("Wait for next completed async stream element result."); - AsyncResult streamElementEntry = streamElementQueue.peekBlockingly(); - - output(streamElementEntry); + AsyncResult asyncResult = streamElementQueue.peekBlockingly(); + executor.submit(() -> { + try { + output(asyncResult); + } catch (InterruptedException e) { Review comment: Why do we need this `InterruptedException`? I see that it's coming (and it is the same in master branch) from `streamElementQueue.poll()`, however I don't understand it: 1. We shouldn't be blocking neither in the mailbox thread (this version) nor under the checkpoint lock (master version), so this already looks suspicious. 2. `poll` method that blocks is already "mildly" surprising contract on it's own 3. From the code it looks like this `poll` call should never block and changing it to a proper poll (returning `Optional` or `null`) and adding a check state that the element is present, would work just as well but would be better/cleaner. @AHeise I remember that you were also rising some objections about the `Emitter` code and that it could be simplified and you wanted to work on that later. Did you have this in mind as well? Am I missing something? If not can you @AHeise create a JIRA ticket for cleaning up those things? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat
pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperator URL: https://github.com/apache/flink/pull/9564#discussion_r319896679 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ## @@ -1139,17 +1138,9 @@ public void triggerCheckpointBarrier( if (executionState == ExecutionState.RUNNING && invokable != null) { - // build a local closure - final SafetyNetCloseableRegistry safetyNetCloseableRegistry = - FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread(); - Runnable runnable = new Runnable() { @Override public void run() { - // set safety net from the task's context for checkpointing thread Review comment: To me it looks like `FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread` should be called by any thread that will be executing some/any part of the code that used to be invoked in `invokable.triggerCheckpoint`. This is the entry point, where safety net registry was being set, and then it was being used implicitly via static variables in `SnapshotDirectory` via `FileSystem#getLocalFileSystem`. > we are not calling any user code and file system operations in the dispatcher thread anymore. I guess yes, probably setting it here now doesn't make sense, but it shouldn't be just removed from here, but moved used inside the mailbox action? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat
pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperator URL: https://github.com/apache/flink/pull/9564#discussion_r319833048 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ## @@ -1139,17 +1138,9 @@ public void triggerCheckpointBarrier( if (executionState == ExecutionState.RUNNING && invokable != null) { - // build a local closure - final SafetyNetCloseableRegistry safetyNetCloseableRegistry = - FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread(); - Runnable runnable = new Runnable() { @Override public void run() { - // set safety net from the task's context for checkpointing thread Review comment: > Do you have any reasons to keep it? I don't know this code and I'm not sure how it's suppose to work, but I see that it's being used (`FileSystemSafetyNet#initializeSafetyNetForThread/closeSafetyNetAndGuardedResourcesForThread/wrapWithSafetyNetWhenActivated`). Since you were removing it, I thought that you have investigated how is it suppose to be working and that's safe to remove it (as I'm pretty sure that such safety feature has a very poor test coverage). > For the second part of your comment, I don't think it would be correct to allow any other (non main task thread) to get letters from the mailbox. And so far, we haven't raised any reason to have such requirement (that letters can be run by some other threads). I guess that's right as long as we do not allow for yielding in the legacy source threads. Do we have a `checkstate` somewhere for that? Somewhere Inside yield a `checkState(isMailboxThread())`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat
pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperator URL: https://github.com/apache/flink/pull/9564#discussion_r319830496 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ## @@ -1151,7 +1151,10 @@ public void run() { FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry); try { - invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime); + invokable.triggerCheckpointAsync(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime); Review comment: I would guess that there is only one RPC thread, however I do not see an issue here even if they weren't? Those actions would be ordered on the mailbox, the same way they were ordered on the `asyncCallDispatcher`. If there was a race on the RPC thread between requests, there would be the same situation now, just against enqueuing into a different single threaded executor. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat
pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperator URL: https://github.com/apache/flink/pull/9564#discussion_r319828976 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorFactory.java ## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.YieldingOperatorFactory; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxExecutor; + +/** + * The factory of {@link AsyncWaitOperator}. + * + * @param The output type of the operator + */ +public class AsyncWaitOperatorFactory implements OneInputStreamOperatorFactory, YieldingOperatorFactory { + private final AsyncFunction asyncFunction; + private final long timeout; + private final int capacity; + private final AsyncDataStream.OutputMode outputMode; + private MailboxExecutor mailboxExecutor; + private ChainingStrategy strategy = ChainingStrategy.HEAD; + + public AsyncWaitOperatorFactory( + AsyncFunction asyncFunction, + long timeout, + int capacity, + AsyncDataStream.OutputMode outputMode) { + this.asyncFunction = asyncFunction; + this.timeout = timeout; + this.capacity = capacity; + this.outputMode = outputMode; + } + + @Override public void setMailboxExecutor(MailboxExecutor mailboxExecutor) { + this.mailboxExecutor = mailboxExecutor; + } + + @Override public StreamOperator createStreamOperator(StreamTask containingTask, StreamConfig config, + Output output) { + AsyncWaitOperator asyncWaitOperator = new AsyncWaitOperator( + asyncFunction, + timeout, + capacity, + outputMode, + mailboxExecutor); + asyncWaitOperator.setup(containingTask, config, output); Review comment: Yes, I meant AsyncWaitOperator :) I copied/pated wrong class name. > I can create a follow up ticket for that Please do so and can you CC me in the ticket? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat
pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperator URL: https://github.com/apache/flink/pull/9564#discussion_r319468648 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ## @@ -380,9 +380,6 @@ public final void invoke() throws Exception { // Invoke Review comment: Can we re-order the commits, that all of the refactors/preparations commits like: - open mailbox early - drain mailbox - "Move decline checkpoint handling into StreamTask" - ... Come before the `AsyncWaitOperator` and `Timers`? In that case we would have only a sequence of 3 hard linked commits (AsyncWaitOperator, timers, checkpoints), instead of 8? 9? What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat
pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperator URL: https://github.com/apache/flink/pull/9564#discussion_r319473779 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ## @@ -1151,7 +1151,10 @@ public void run() { FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry); try { - invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime); + invokable.triggerCheckpointAsync(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime); Review comment: also hmmm, this is a bit strange pattern. We enqueue some action in a single threaded `asyncCallDispatcher`, which has mostly only one thing to do: enqueue something in the mailbox. Can not we simply drop `executeAsyncCallRunnable` and `asyncCallDispatcher` and go directly to the mailbox? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat
pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperator URL: https://github.com/apache/flink/pull/9564#discussion_r319471388 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ## @@ -1151,7 +1151,10 @@ public void run() { FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry); try { - invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime); + invokable.triggerCheckpointAsync(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime); + } + catch (RejectedExecutionException ex) { + // This may happen if the mailbox is closed. It means that the task is shutting down, so we just ignore it. Review comment: should we log this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat
pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperator URL: https://github.com/apache/flink/pull/9564#discussion_r319453203 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java ## @@ -305,14 +305,8 @@ public void testWaterMarkUnordered() throws Exception { } private void testEventTime(AsyncDataStream.OutputMode mode) throws Exception { - final AsyncWaitOperator operator = new AsyncWaitOperator<>( - new MyAsyncFunction(), - TIMEOUT, - 2, - mode); - - final OneInputStreamOperatorTestHarness testHarness = - new OneInputStreamOperatorTestHarness<>(operator, IntSerializer.INSTANCE); + final OneInputStreamOperatorTestHarness testHarness = createTestHarness( + new MyAsyncFunction(), TIMEOUT, 2, mode); Review comment: nit: can you restore previous formatting? https://flink.apache.org/contributing/code-style-and-quality-formatting.html#breaking-the-lines-of-too-long-statements > If you break the line then each argument/call should have a separate line, including the first one This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat
pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperator URL: https://github.com/apache/flink/pull/9564#discussion_r319478576 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java ## @@ -261,10 +261,9 @@ public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) throws * Invoked when a checkpoint has been completed, i.e., when the checkpoint coordinator has received Review comment: > [FLINK-12482][runtime] Move CheckpointListener.notifyCheckpointComplete call into StreamTask Why did we need this change? Can you explain the reason behind this commit inside the commit message? Maybe it should be squashed with the following commit? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat
pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperator URL: https://github.com/apache/flink/pull/9564#discussion_r319467875 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ## @@ -428,6 +429,10 @@ public final void invoke() throws Exception { // See FLINK-7430 isRunning = false; } + MailboxExecutor mainMailboxExecutor = mailboxProcessor.getMainMailboxExecutor(); Review comment: +1 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat
pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperator URL: https://github.com/apache/flink/pull/9564#discussion_r319465635 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ## @@ -64,6 +64,7 @@ import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; Review comment: +1 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat
pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperator URL: https://github.com/apache/flink/pull/9564#discussion_r319463486 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorFactory.java ## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.YieldingOperatorFactory; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxExecutor; + +/** + * The factory of {@link AsyncWaitOperator}. + * + * @param The output type of the operator + */ +public class AsyncWaitOperatorFactory implements OneInputStreamOperatorFactory, YieldingOperatorFactory { + private final AsyncFunction asyncFunction; + private final long timeout; + private final int capacity; + private final AsyncDataStream.OutputMode outputMode; + private MailboxExecutor mailboxExecutor; + private ChainingStrategy strategy = ChainingStrategy.HEAD; + + public AsyncWaitOperatorFactory( + AsyncFunction asyncFunction, + long timeout, + int capacity, + AsyncDataStream.OutputMode outputMode) { + this.asyncFunction = asyncFunction; + this.timeout = timeout; + this.capacity = capacity; + this.outputMode = outputMode; + } + + @Override public void setMailboxExecutor(MailboxExecutor mailboxExecutor) { Review comment: nit: `@Override` in a separate line? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat
pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperator URL: https://github.com/apache/flink/pull/9564#discussion_r319479865 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ## @@ -1183,8 +1183,12 @@ public void notifyCheckpointComplete(final long checkpointID) { @Override public void run() { try { - invokable.notifyCheckpointComplete(checkpointID); - } catch (Throwable t) { + invokable.notifyCheckpointCompleteAsync(checkpointID); Review comment: ditto about the double "mailbox" pattern? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat
pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperator URL: https://github.com/apache/flink/pull/9564#discussion_r319463940 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorFactory.java ## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.YieldingOperatorFactory; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxExecutor; + +/** + * The factory of {@link AsyncWaitOperator}. + * + * @param The output type of the operator + */ +public class AsyncWaitOperatorFactory implements OneInputStreamOperatorFactory, YieldingOperatorFactory { + private final AsyncFunction asyncFunction; + private final long timeout; + private final int capacity; + private final AsyncDataStream.OutputMode outputMode; + private MailboxExecutor mailboxExecutor; + private ChainingStrategy strategy = ChainingStrategy.HEAD; + + public AsyncWaitOperatorFactory( + AsyncFunction asyncFunction, + long timeout, + int capacity, + AsyncDataStream.OutputMode outputMode) { + this.asyncFunction = asyncFunction; + this.timeout = timeout; + this.capacity = capacity; + this.outputMode = outputMode; + } + + @Override public void setMailboxExecutor(MailboxExecutor mailboxExecutor) { + this.mailboxExecutor = mailboxExecutor; + } + + @Override public StreamOperator createStreamOperator(StreamTask containingTask, StreamConfig config, + Output output) { + AsyncWaitOperator asyncWaitOperator = new AsyncWaitOperator( + asyncFunction, + timeout, + capacity, + outputMode, + mailboxExecutor); + asyncWaitOperator.setup(containingTask, config, output); Review comment: setup method is no longer needed with `AsyncWaitOperatorFactory` and whole `AsyncWaitOperatorFactory` could be cleaned up: all fields could be made non `transient` and `final`. If it's too big clean up to be done within this PR, can you create a ticket to do this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat
pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperator URL: https://github.com/apache/flink/pull/9564#discussion_r319464910 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java ## @@ -50,29 +50,21 @@ // - /** The containing task that owns this time service provider. */ - private final AsyncExceptionHandler task; - - /** The lock that timers acquire upon triggering. */ - private final Object checkpointLock; - /** The executor service that schedules and calls the triggers of this task. */ private final ScheduledThreadPoolExecutor timerService; + private final ScheduledCallbackExecutionContext callbackExecutionContext; private final AtomicInteger status; - public SystemProcessingTimeService(AsyncExceptionHandler failureHandler, Object checkpointLock) { - this(failureHandler, checkpointLock, null); + @VisibleForTesting + SystemProcessingTimeService(ScheduledCallbackExecutionContext callbackExecutionContext) { + this(callbackExecutionContext, null); } public SystemProcessingTimeService( - AsyncExceptionHandler task, - Object checkpointLock, - ThreadFactory threadFactory) { - - this.task = checkNotNull(task); - this.checkpointLock = checkNotNull(checkpointLock); + ScheduledCallbackExecutionContext callbackExecutionContext, ThreadFactory threadFactory) { Review comment: nit: ditto about wrapped parameters in one line. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat
pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperator URL: https://github.com/apache/flink/pull/9564#discussion_r319476565 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ## @@ -1139,17 +1138,9 @@ public void triggerCheckpointBarrier( if (executionState == ExecutionState.RUNNING && invokable != null) { - // build a local closure - final SafetyNetCloseableRegistry safetyNetCloseableRegistry = - FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread(); - Runnable runnable = new Runnable() { @Override public void run() { - // set safety net from the task's context for checkpointing thread Review comment: Are you sure we can remove this? What was the purpose of this safety net? Actually, shouldn't we set it inside the mailbox action? I guess this is set correctly for the main Task's thread, but this `triggerCheckpointAsync` code can be executed from a `LegacySourceThread` as well? (because of yielding execution)? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat
pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperator URL: https://github.com/apache/flink/pull/9564#discussion_r319470555 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java ## @@ -219,9 +220,12 @@ public ExecutionConfig getExecutionConfig() { * @param advanceToEndOfEventTime Flag indicating if the source should inject a {@code MAX_WATERMARK} in the pipeline * to fire any registered event-time timers * -* @return {@code false} if the checkpoint can not be carried out, {@code true} otherwise +* @return future with value of {@code false} if the checkpoint was not carried out, {@code true} otherwise */ - public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) throws Exception { + public Future triggerCheckpointAsync( + CheckpointMetaData checkpointMetaData, + CheckpointOptions checkpointOptions, + boolean advanceToEndOfEventTime) { Review comment: nit: double indent the parameters or put a new line after `{`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat
pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperator URL: https://github.com/apache/flink/pull/9564#discussion_r319461753 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java ## @@ -1127,4 +1073,12 @@ public void testEndInput() throws Exception { } } } + + private static OneInputStreamOperatorTestHarness createTestHarness( + AsyncFunction function, long timeout, int capacity, AsyncDataStream.OutputMode outputMode) throws Exception { Review comment: nit: wrap the parameters? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat
pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperator URL: https://github.com/apache/flink/pull/9564#discussion_r319464273 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/EmitterTest.java ## @@ -194,4 +205,22 @@ public void testEmitterWithExceptions() throws Exception { emitterThread.interrupt(); } } + + private static class MockExecutor implements MailboxExecutor { + @Override public void execute(@Nonnull Runnable command) throws RejectedExecutionException { Review comment: `@Override` in a separate line? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat
pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperator URL: https://github.com/apache/flink/pull/9564#discussion_r319468278 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ## @@ -428,6 +429,10 @@ public final void invoke() throws Exception { // See FLINK-7430 isRunning = false; } + MailboxExecutor mainMailboxExecutor = mailboxProcessor.getMainMailboxExecutor(); Review comment: Also we should have a dedicated unit test for that (unless it's too complicated to setup, but it would be better not to relay on ITCase's for this feature). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat
pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperator URL: https://github.com/apache/flink/pull/9564#discussion_r319472433 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ## @@ -1151,7 +1151,10 @@ public void run() { FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry); try { - invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime); + invokable.triggerCheckpointAsync(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime); Review comment: missing `.get()`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat
pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperator URL: https://github.com/apache/flink/pull/9564#discussion_r319480244 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ## @@ -1183,8 +1183,12 @@ public void notifyCheckpointComplete(final long checkpointID) { @Override public void run() { try { - invokable.notifyCheckpointComplete(checkpointID); - } catch (Throwable t) { + invokable.notifyCheckpointCompleteAsync(checkpointID); Review comment: if we do not wait, then we will miss the error handling? But please first respond to my comment about double/nested mailbox/enqueuing pattern. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperat
pnowojski commented on a change in pull request #9564: [FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox for timer triggers, checkpoints and AsyncWaitOperator URL: https://github.com/apache/flink/pull/9564#discussion_r319467301 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ## @@ -1368,13 +1369,19 @@ private void checkpointStreamOperator(StreamOperator op) throws Exception { private class TimerInvocationContext implements SystemProcessingTimeService.ScheduledCallbackExecutionContext { @Override - public void invoke(ProcessingTimeCallback callback, long timestamp) { - synchronized (getCheckpointLock()) { - try { - callback.onProcessingTime(timestamp); - } catch (Throwable t) { - handleAsyncException("Caught exception while processing timer.", new TimerException(t)); - } + public void invoke(ProcessingTimeCallback callback, long timestamp) throws InterruptedException { + try { + mailboxProcessor.getMailboxExecutor(TaskMailbox.MAX_PRIORITY).execute(() -> { + synchronized (getCheckpointLock()) { + try { + callback.onProcessingTime(timestamp); + } catch (Throwable t) { + handleAsyncException("Caught exception while processing timer.", new TimerException(t)); + } + } + }); + } catch (Throwable t) { Review comment: Unfortunately not, this is I think the upper most layer in the stack trace. If we do not catch an exception here, it would be silently ignored. This made the deadlock that I was debugging last week non obvious to find. However maybe this deserves some comment here explaining this construct? Like ``` // Inner try catch handles all errors during the execution of the action in the mailbox. Outer try catch handles errors that could happen during enqueuing the action. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services