[GitHub] [flink] tillrohrmann commented on a change in pull request #12264: [FLINK-17558][netty] Release partitions asynchronously

2020-05-27 Thread GitBox


tillrohrmann commented on a change in pull request #12264:
URL: https://github.com/apache/flink/pull/12264#discussion_r431091900



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
##
@@ -366,11 +367,15 @@ public static TaskExecutor startTaskManager(
resourceID,

taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval());
 
+   final ExecutorService ioExecutor = 
Executors.newCachedThreadPool(
+   taskManagerServicesConfiguration.getNumIoThreads(),
+   new ExecutorThreadFactory("flink-taskexecutor-io"));

Review comment:
   But we can also keep it as is. The important aspect is that everyone 
knows that `fromConfiguration` takes ownership over the passed `ioExecutor`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] tillrohrmann commented on a change in pull request #12264: [FLINK-17558][netty] Release partitions asynchronously

2020-05-27 Thread GitBox


tillrohrmann commented on a change in pull request #12264:
URL: https://github.com/apache/flink/pull/12264#discussion_r431090914



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
##
@@ -366,11 +367,15 @@ public static TaskExecutor startTaskManager(
resourceID,

taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval());
 
+   final ExecutorService ioExecutor = 
Executors.newCachedThreadPool(
+   taskManagerServicesConfiguration.getNumIoThreads(),
+   new ExecutorThreadFactory("flink-taskexecutor-io"));

Review comment:
   Jup. It is also a bit odd that 
`TaskExecutorLocalStateStoresManagerTest.testCreationFromConfig` needs to 
create a `TaskManagerServices` instance in order to test the 
`TaskExecutorLocalStateStoresManager`. I think one could test the same if one 
factored the creation of the `TaskExecutorLocalStateStoresManager` out into a 
separate method which is called by `TaskManagerServices.fromConfiguration()`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] tillrohrmann commented on a change in pull request #12264: [FLINK-17558][netty] Release partitions asynchronously

2020-05-27 Thread GitBox


tillrohrmann commented on a change in pull request #12264:
URL: https://github.com/apache/flink/pull/12264#discussion_r431066056



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
##
@@ -270,7 +271,8 @@ public static TaskManagerServicesConfiguration 
fromConfiguration(
 
final String[] alwaysParentFirstLoaderPatterns = 
CoreOptions.getParentFirstLoaderPatterns(configuration);
 
-   final int numIoThreads = 
configuration.get(TaskManagerOptions.NUM_IO_THREADS);
+   // multiply core-count to be on the safer side, since we used a 
pool with size=64 in the past
+   final int numIoThreads = 
ClusterEntrypointUtils.getPoolSize(configuration) * 4;

Review comment:
   ```suggestion
final int numIoThreads = 
ClusterEntrypointUtils.getPoolSize(configuration);
   ```
   
   I will update `getPoolSize` to return the new default value of `4 * cores` 
as part of changing the type of the thread pool on the JM side.

##
File path: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/executor/TestExecutorResource.java
##
@@ -19,35 +19,37 @@
 
 import org.junit.rules.ExternalResource;
 
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Supplier;
 
 /**
- * Resource which starts an {@link ExecutorService} for testing purposes.
+ * Resource which starts/stops an {@link ExecutorService} for testing purposes.
  */
-public class TestExecutorServiceResource extends ExternalResource {
+public class TestExecutorResource extends ExternalResource {
 
private final Supplier serviceFactory;
 
-   private ExecutorService executorService;
+   private ExecutorService executor;

Review comment:
   The name `executorService` could have been kept.

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
##
@@ -366,11 +367,15 @@ public static TaskExecutor startTaskManager(
resourceID,

taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval());
 
+   final ExecutorService ioExecutor = 
Executors.newCachedThreadPool(
+   taskManagerServicesConfiguration.getNumIoThreads(),
+   new ExecutorThreadFactory("flink-taskexecutor-io"));

Review comment:
   This could be moved into the `fromConfiguration` method. Given that the 
`TaskManagerServices` is responsible for managing the created 
`ExecutorService`, I think it is fine to move the creation into the 
`fromConfiguration` method.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] tillrohrmann commented on a change in pull request #12264: [FLINK-17558][netty] Release partitions asynchronously

2020-05-27 Thread GitBox


tillrohrmann commented on a change in pull request #12264:
URL: https://github.com/apache/flink/pull/12264#discussion_r430897493



##
File path: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/executor/TestExecutorServiceResource.java
##
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.executor;
+
+import org.junit.rules.ExternalResource;
+
+import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
+
+/**
+ * Resource which starts an {@link ExecutorService} for testing purposes.
+ */
+public class TestExecutorServiceResource extends ExternalResource {

Review comment:
   Can we unify them and get rid of one of them?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] tillrohrmann commented on a change in pull request #12264: [FLINK-17558][netty] Release partitions asynchronously

2020-05-27 Thread GitBox


tillrohrmann commented on a change in pull request #12264:
URL: https://github.com/apache/flink/pull/12264#discussion_r430897263



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
##
@@ -265,10 +265,15 @@ public static TaskManagerServices fromConfiguration(
// start the I/O manager, it will create some temp directories.
final IOManager ioManager = new 
IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());
 
+   final ExecutorService ioExecutor = Executors.newFixedThreadPool(

Review comment:
   I think it is actually a good idea to move it away from the 
`RpcService's` executor because this is the same pool which is used to run the 
`RpcEndpoints`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] tillrohrmann commented on a change in pull request #12264: [FLINK-17558][netty] Release partitions asynchronously

2020-05-26 Thread GitBox


tillrohrmann commented on a change in pull request #12264:
URL: https://github.com/apache/flink/pull/12264#discussion_r430381100



##
File path: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
##
@@ -490,6 +490,13 @@
+ " size will be used. The exact size of JVM 
Overhead can be explicitly specified by setting the min/max"
+ " size to the same value.");
 
+   @Documentation.ExcludeFromDocumentation("This option just serves as a 
last-ditch escape hatch.")
+   public static final ConfigOption NUM_IO_THREADS =
+   key("taskmanager.io.threads.num")
+   .intType()
+   .defaultValue(2)
+   .withDescription("The number of threads to use for 
non-critical IO operations.");

Review comment:
   The `HighAvailabilityServices` use the `ioExecutor` for I/O tasks. 
Concretely, they use it do dispose a completed checkpoint. In that sense it is 
also an I/O operation and I think they could be served by the same executor. 
But in order to keep the scope smaller, we don't have to do it in this PR.

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
##
@@ -265,10 +265,15 @@ public static TaskManagerServices fromConfiguration(
// start the I/O manager, it will create some temp directories.
final IOManager ioManager = new 
IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());
 
+   final ExecutorService ioExecutor = Executors.newFixedThreadPool(

Review comment:
   I think we should really unify them. `taskIOExecutor` is effectively the 
same as `ioExecutor` because it is used by the 
`TaskExecutorLocalStateStoresManager` for discarding local state. I think it is 
better to use common thread pools until we have a really good reason for 
separating thread pools. Creating more thread pools will also increase the 
required resource foot print.
   
   Concerning the number of threads, I would suggest to use 
`ClusterOptions.CLUSTER_IO_EXECUTOR_POOL_SIZE` which defaults to the number of 
available cores if it has not been specified. This will also allow users to 
increase the thread pool if things take too long with a single thread.

##
File path: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/executor/TestExecutorServiceResource.java
##
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.executor;
+
+import org.junit.rules.ExternalResource;
+
+import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
+
+/**
+ * Resource which starts an {@link ExecutorService} for testing purposes.
+ */
+public class TestExecutorServiceResource extends ExternalResource {
+
+   private final Supplier serviceFactory;
+
+   private ExecutorService executorService;
+
+   public TestExecutorServiceResource(Supplier 
serviceFactory) {
+   this.serviceFactory = serviceFactory;
+   }
+
+   @Override
+   protected void before() throws Throwable {
+   executorService = serviceFactory.get();
+   }
+
+   public ExecutorService getExecutorService() {
+   return executorService;
+   }
+
+   @Override
+   protected void after() {
+   if (executorService != null) {
+   executorService.shutdown();

Review comment:
   I would suggest to use `ExecutorUtils.gracefulShutdown`.

##
File path: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/executor/TestExecutorServiceResource.java
##
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may 

[GitHub] [flink] tillrohrmann commented on a change in pull request #12264: [FLINK-17558][netty] Release partitions asynchronously

2020-05-20 Thread GitBox


tillrohrmann commented on a change in pull request #12264:
URL: https://github.com/apache/flink/pull/12264#discussion_r428047035



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
##
@@ -280,7 +273,65 @@ public void testClusterPartitionRelease() throws Exception 
{
);
}
 
-   private  void testPartitionRelease(PartitionTrackerSetup 
partitionTrackerSetup, TestAction testAction) throws Exception {
+   @Test
+   public void testBlockingLocalPartitionReleaseDoesNotBlockTaskExecutor() 
throws Exception {
+   BlockerSync sync = new BlockerSync();
+   ResultPartitionManager blockingResultPartitionManager = new 
ResultPartitionManager() {
+   @Override
+   public void releasePartition(ResultPartitionID 
partitionId, Throwable cause) {
+   sync.blockNonInterruptible();
+   super.releasePartition(partitionId, cause);
+   }
+   };
+
+   NettyShuffleEnvironment shuffleEnvironment = new 
NettyShuffleEnvironmentBuilder()
+   
.setResultPartitionManager(blockingResultPartitionManager)
+   
.setIoExecutor(java.util.concurrent.Executors.newFixedThreadPool(1))

Review comment:
   I would suggest to also shut this executor service down at the end of 
the test. It might be necessary to unblock the release operation for this.

##
File path: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
##
@@ -490,6 +490,13 @@
+ " size will be used. The exact size of JVM 
Overhead can be explicitly specified by setting the min/max"
+ " size to the same value.");
 
+   @Documentation.ExcludeFromDocumentation("This option just serves as a 
last-ditch escape hatch.")
+   public static final ConfigOption NUM_IO_THREADS =
+   key("taskmanager.io.threads.num")
+   .intType()
+   .defaultValue(2)
+   .withDescription("The number of threads to use for 
non-critical IO operations.");

Review comment:
   We might be able to unify this configuration option with 
`ClusterOptions.CLUSTER_IO_EXECUTOR_POOL_SIZE`.

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
##
@@ -265,10 +265,15 @@ public static TaskManagerServices fromConfiguration(
// start the I/O manager, it will create some temp directories.
final IOManager ioManager = new 
IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());
 
+   final ExecutorService ioExecutor = Executors.newFixedThreadPool(

Review comment:
   Can the `ioExecutor` also replace the `taskIOExecutor`?

##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
##
@@ -100,6 +105,27 @@ public void testRegisterTaskWithInsufficientBuffers() 
throws Exception {
testRegisterTaskWithLimitedBuffers(bufferCount);
}
 
+   @Test
+   public void testSlowIODoesNotBlockRelease() throws Exception {
+   BlockerSync sync = new BlockerSync();

Review comment:
   I guess a `OneShotLatch` would also work here if the test threads call 
the trigger on it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org