asfgit closed pull request #6839: [FLINK-10253] Run MetricQueryService with 
lower priority
URL: https://github.com/apache/flink/pull/6839
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/_includes/generated/metric_configuration.html 
b/docs/_includes/generated/metric_configuration.html
index 5c073f3ad59..420bb7f1da2 100644
--- a/docs/_includes/generated/metric_configuration.html
+++ b/docs/_includes/generated/metric_configuration.html
@@ -12,6 +12,11 @@
             <td style="word-wrap: break-word;">"0"</td>
             <td>The port range used for Flink's internal metric query service. 
Accepts a list of ports (“50100,50101”), ranges(“50100-50200”) or a combination 
of both. It is recommended to set a range of ports to avoid collisions when 
multiple Flink components are running on the same machine. Per default Flink 
will pick a random port.</td>
         </tr>
+        <tr>
+            <td><h5>metrics.internal.query-service.thread-priority</h5></td>
+            <td style="word-wrap: break-word;">1</td>
+            <td>The thread priority used for Flink's internal metric query 
service. The thread is created by Akka's thread pool executor. The range of the 
priority is from 1 (MIN_PRIORITY) to 10 (MAX_PRIORITY). Warning, increasing 
this value may bring the main Flink components down.</td>
+        </tr>
         <tr>
             <td><h5>metrics.latency.granularity</h5></td>
             <td style="word-wrap: break-word;">"operator"</td>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
index 0e7268ee052..0785b347335 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
@@ -156,6 +156,18 @@
                        "ports to avoid collisions when multiple Flink 
components are running on the same machine. Per default " +
                        "Flink will pick a random port.");
 
+       /**
+        * The thread priority for Flink's internal metric query service. The 
{@code 1} means the min priority and the
+        * {@code 10} means the max priority.
+        */
+       public static final ConfigOption<Integer> QUERY_SERVICE_THREAD_PRIORITY 
=
+               key("metrics.internal.query-service.thread-priority")
+               .defaultValue(1)
+               .withDescription("The thread priority used for Flink's internal 
metric query service. The thread is created" +
+                       " by Akka's thread pool executor. " +
+                       "The range of the priority is from 1 (MIN_PRIORITY) to 
10 (MAX_PRIORITY). " +
+                       "Warning, increasing this value may bring the main 
Flink components down.");
+
        private MetricOptions() {
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index 00b61737d20..430af98bc2e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -24,6 +24,7 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -290,7 +291,8 @@ private static Config 
getExecutorConfigByExecutorMode(Configuration configuratio
                        case FORK_JOIN_EXECUTOR:
                                return 
AkkaUtils.getForkJoinExecutorConfig(configuration);
                        case FIXED_THREAD_POOL_EXECUTOR:
-                               return AkkaUtils.getThreadPoolExecutorConfig();
+                               return AkkaUtils.getThreadPoolExecutorConfig(
+                                       
configuration.getInteger(MetricOptions.QUERY_SERVICE_THREAD_PRIORITY));
                        default:
                                throw new 
IllegalArgumentException(String.format("Unknown ActorSystemExecutorMode %s.", 
executorMode));
                }
diff --git 
a/flink-runtime/src/main/scala/akka/dispatch/PriorityThreadFactory.scala 
b/flink-runtime/src/main/scala/akka/dispatch/PriorityThreadFactory.scala
new file mode 100644
index 00000000000..d6f6d76ec51
--- /dev/null
+++ b/flink-runtime/src/main/scala/akka/dispatch/PriorityThreadFactory.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package akka.dispatch
+
+import java.util.concurrent.ThreadFactory
+
+/**
+  * Composition over the [[DispatcherPrerequisites.threadFactory]] that set 
priority
+  * for newly created threads.
+  *
+  * @param newThreadPriority priority that will be set to each newly created 
thread
+  *                          should be between Thread.MIN_PRIORITY and 
Thread.MAX_PRIORITY
+  */
+class PriorityThreadFactory(
+  prerequisites: DispatcherPrerequisites,
+  newThreadPriority: Int) extends ThreadFactory {
+    override def newThread(r: Runnable): Thread = {
+      val newThread = prerequisites.threadFactory.newThread(r)
+      newThread.setPriority(newThreadPriority)
+      newThread
+    }
+}
diff --git 
a/flink-runtime/src/main/scala/akka/dispatch/PriorityThreadsDispatcher.scala 
b/flink-runtime/src/main/scala/akka/dispatch/PriorityThreadsDispatcher.scala
new file mode 100644
index 00000000000..06ef001f1a7
--- /dev/null
+++ b/flink-runtime/src/main/scala/akka/dispatch/PriorityThreadsDispatcher.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package akka.dispatch
+
+import com.typesafe.config.Config
+
+/**
+  * Akka Dispatcher that creates thread with configurable priority.
+  *
+  * Example of configuration:
+  *
+  *   low-priority-threads-dispatcher {
+  *     type = akka.dispatch.PriorityThreadsDispatcher
+  *     executor = "thread-pool-executor"
+  *     # should be between Thread.MIN_PRIORITY (which is 1) and 
Thread.MAX_PRIORITY (which is 10)
+  *     threads-priority = 1
+  *     thread-pool-executor {
+  *       core-pool-size-min = 0
+  *       core-pool-size-factor = 2.0
+  *       core-pool-size-max = 10
+  *     }
+  *   }
+  *
+  * Two arguments constructor (the primary constructor) is automatically 
called by Akka
+  * when it founds:
+  *   abcde-dispatcher {
+  *     type = akka.dispatch.PriorityThreadsDispatcher <-- the class that Akka 
will instantiate
+  *     ...
+  *   }
+  *
+  * @param config passed automatically by Akka, should contains information 
about threads priority
+  * @param prerequisites passed automatically by Akka
+  */
+class PriorityThreadsDispatcher(config: Config, prerequisites: 
DispatcherPrerequisites)
+  extends DispatcherConfigurator(
+    config,
+    new PriorityThreadsDispatcherPrerequisites(
+      prerequisites,
+      config.getInt(PriorityThreadsDispatcher.threadPriorityConfigKey)
+    )
+  )
+
+object PriorityThreadsDispatcher {
+  /**
+    * Configuration key under which int value should be placed.
+    */
+  val threadPriorityConfigKey = "thread-priority"
+}
diff --git 
a/flink-runtime/src/main/scala/akka/dispatch/PriorityThreadsDispatcherPrerequisites.scala
 
b/flink-runtime/src/main/scala/akka/dispatch/PriorityThreadsDispatcherPrerequisites.scala
new file mode 100644
index 00000000000..a62f12185f6
--- /dev/null
+++ 
b/flink-runtime/src/main/scala/akka/dispatch/PriorityThreadsDispatcherPrerequisites.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package akka.dispatch
+
+/**
+  * Composition over [[DefaultDispatcherPrerequisites]] that replaces thread 
factory with one that
+  * allow to configure thread priority.
+  *
+  * @param newThreadPriority priority that will be set to each newly created 
thread
+  *                          should be between Thread.MIN_PRIORITY and 
Thread.MAX_PRIORITY
+  */
+class PriorityThreadsDispatcherPrerequisites(
+  prerequisites: DispatcherPrerequisites,
+  newThreadPriority: Int) extends DispatcherPrerequisites {
+
+  private val defaultDispatcherPrerequisites : DefaultDispatcherPrerequisites =
+    new DefaultDispatcherPrerequisites(
+      eventStream = prerequisites.eventStream,
+      scheduler = prerequisites.scheduler,
+      dynamicAccess = prerequisites.dynamicAccess,
+      settings = prerequisites.settings,
+      mailboxes = prerequisites.mailboxes,
+      defaultExecutionContext = prerequisites.defaultExecutionContext,
+      threadFactory = new PriorityThreadFactory(prerequisites, 
newThreadPriority)
+  )
+
+  override def threadFactory : java.util.concurrent.ThreadFactory = {
+    defaultDispatcherPrerequisites.threadFactory
+  }
+
+  override def eventStream : akka.event.EventStream = {
+    defaultDispatcherPrerequisites.eventStream
+  }
+
+  override def scheduler : akka.actor.Scheduler = {
+    defaultDispatcherPrerequisites.scheduler
+  }
+
+  override def dynamicAccess : akka.actor.DynamicAccess = {
+    defaultDispatcherPrerequisites.dynamicAccess
+  }
+
+  override def settings : akka.actor.ActorSystem.Settings = {
+    defaultDispatcherPrerequisites.settings
+  }
+
+  override def mailboxes : akka.dispatch.Mailboxes = {
+    defaultDispatcherPrerequisites.mailboxes
+  }
+
+  override def defaultExecutionContext : 
scala.Option[scala.concurrent.ExecutionContext] = {
+    defaultDispatcherPrerequisites.defaultExecutionContext
+  }
+}
+
+object PriorityThreadsDispatcherPrerequisites {
+  def apply(prerequisites: DispatcherPrerequisites, newThreadPriority: Int):
+    PriorityThreadsDispatcherPrerequisites =
+      new PriorityThreadsDispatcherPrerequisites(prerequisites, 
newThreadPriority)
+}
+
+
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index de2f35ef8fd..80c17cc7bfd 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -26,7 +26,7 @@ import akka.actor._
 import akka.pattern.{ask => akkaAsk}
 import com.typesafe.config.{Config, ConfigFactory}
 import org.apache.flink.api.common.time.Time
-import org.apache.flink.configuration.{AkkaOptions, Configuration, 
IllegalConfigurationException, SecurityOptions}
+import org.apache.flink.configuration._
 import org.apache.flink.runtime.concurrent.FutureUtils
 import org.apache.flink.runtime.net.SSLUtils
 import org.apache.flink.util.NetUtils
@@ -291,12 +291,20 @@ object AkkaUtils {
     ConfigFactory.parseString(config)
   }
 
-  def getThreadPoolExecutorConfig: Config = {
+  def getThreadPoolExecutorConfig(threadPriority: Int): Config = {
+    if (threadPriority < Thread.MIN_PRIORITY || threadPriority > 
Thread.MAX_PRIORITY) {
+      throw new IllegalConfigurationException("The threadPriority must be 
between "
+        + Thread.MIN_PRIORITY + " and " + Thread.MAX_PRIORITY +
+        ", but it is " + threadPriority)
+    }
+
     val configString = s"""
        |akka {
        |  actor {
        |    default-dispatcher {
+       |      type = akka.dispatch.PriorityThreadsDispatcher
        |      executor = "thread-pool-executor"
+       |      thread-priority = $threadPriority
        |      thread-pool-executor {
        |        core-pool-size-min = 2
        |        core-pool-size-factor = 2.0
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
index e5c1668df0a..ae6209abdd9 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
@@ -20,14 +20,17 @@ package org.apache.flink.runtime.akka
 
 import java.net.{InetAddress, InetSocketAddress}
 
-import org.apache.flink.configuration.{AkkaOptions, Configuration, 
IllegalConfigurationException}
+import org.apache.flink.configuration.{AkkaOptions, Configuration, 
IllegalConfigurationException, MetricOptions}
 import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution
+import org.apache.flink.runtime.metrics.util.MetricUtils
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils.AkkaProtocol
 import org.apache.flink.util.NetUtils
+import org.junit.Assert.assertEquals
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
+import org.slf4j.LoggerFactory
 
 @RunWith(classOf[JUnitRunner])
 class AkkaUtilsTest
@@ -184,13 +187,44 @@ class AkkaUtilsTest
   }
 
   test("getAkkaConfig respects executor config") {
-    val akkaConfig = AkkaUtils.getAkkaConfig(
+    var akkaConfig = AkkaUtils.getAkkaConfig(
       new Configuration(),
       "localhost",
       1234,
-      AkkaUtils.getThreadPoolExecutorConfig)
+      AkkaUtils.getThreadPoolExecutorConfig(Thread.MIN_PRIORITY))
 
     akkaConfig.getString("akka.actor.default-dispatcher.executor") should
       equal("thread-pool-executor")
+
+    akkaConfig.getInt("akka.actor.default-dispatcher.thread-priority") should
+      equal(Thread.MIN_PRIORITY)
+
+    akkaConfig = AkkaUtils.getAkkaConfig(
+      new Configuration(),
+      "localhost",
+      1234,
+      AkkaUtils.getThreadPoolExecutorConfig(Thread.MAX_PRIORITY))
+
+    akkaConfig.getInt("akka.actor.default-dispatcher.thread-priority") should
+      equal(Thread.MAX_PRIORITY)
+  }
+
+  test("thread priority for metrics ActorSystem ") {
+    var actorSystem = MetricUtils.startMetricsActorSystem(
+      new Configuration, "localhost", LoggerFactory.getLogger("AkkaUtilsTest"))
+    //test default thread priority
+    val defaultThreadPriority = actorSystem.settings.config.getInt(
+      "akka.actor.default-dispatcher.thread-priority")
+    //check default value
+    assertEquals(Thread.MIN_PRIORITY, defaultThreadPriority)
+
+    val config = new Configuration()
+    config.setInteger(MetricOptions.QUERY_SERVICE_THREAD_PRIORITY, 
Thread.MAX_PRIORITY)
+    actorSystem = MetricUtils.startMetricsActorSystem(
+      config, "localhost", LoggerFactory.getLogger("AkkaUtilsTest"))
+    val threadPriority = actorSystem.settings.config.getInt(
+      "akka.actor.default-dispatcher.thread-priority")
+    //check config value
+    assertEquals(Thread.MAX_PRIORITY, threadPriority)
   }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to