Add debug guardians to suppress string generation which caused a significant performance loss.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/dd9a1ba4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/dd9a1ba4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/dd9a1ba4 Branch: refs/heads/master Commit: dd9a1ba43579f18bf536442ae3f5854c64a3791f Parents: f726e55 Author: Till Rohrmann <[email protected]> Authored: Wed Dec 10 10:50:45 2014 +0100 Committer: Till Rohrmann <[email protected]> Committed: Thu Dec 18 18:58:31 2014 +0100 ---------------------------------------------------------------------- .../flink/configuration/ConfigConstants.java | 2 +- .../apache/flink/runtime/ActorLogMessages.scala | 8 ++++-- .../apache/flink/runtime/akka/AkkaUtils.scala | 6 ++++ .../flink/runtime/jobmanager/JobManager.scala | 30 +++++++++++++------- .../flink/runtime/taskmanager/TaskManager.scala | 14 +++++++-- .../runtime/testingUtils/TestingCluster.scala | 8 ------ .../flink/test/util/AbstractTestBase.java | 1 + .../test/util/ForkableFlinkMiniCluster.scala | 4 --- .../src/test/resources/log4j-test.properties | 2 +- pom.xml | 12 ++++---- 10 files changed, 51 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd9a1ba4/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 15b76f7..c615821 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -586,7 +586,7 @@ public final class ConfigConstants { public static String DEFAULT_AKKA_WATCH_HEARTBEAT_INTERVAL = "5000 ms"; - public static String DEFAULT_AKKA_WATCH_HEARTBEAT_PAUSE = "1 min"; + public static String DEFAULT_AKKA_WATCH_HEARTBEAT_PAUSE = "1 m"; public static double DEFAULT_AKKA_WATCH_THRESHOLD = 12; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd9a1ba4/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala index b39c11d..892e68d 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala @@ -30,11 +30,15 @@ trait ActorLogMessages { override def isDefinedAt(x: Any): Boolean = _receiveWithLogMessages.isDefinedAt(x) override def apply(x: Any):Unit = { - log.debug("Received message {} from {}.", x, self.sender) + if(log.isDebugEnabled) { + log.debug(s"Received message ${x} from ${self.sender}.") + } val start = System.nanoTime() _receiveWithLogMessages(x) val duration = (System.nanoTime() - start) / 1000000 - log.debug(s"Handled message {} in {} ms from {}.", x, duration, self.sender) + if(log.isDebugEnabled) { + log.debug(s"Handled message ${x} in ${duration} ms from ${self.sender}.") + } } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd9a1ba4/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index 0c5405e..b23f620 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -113,7 +113,13 @@ object AkkaUtils { | | actor{ | default-dispatcher{ + | executor = "default-executor" + | | throughput = ${akkaThroughput} + | + | fork-join-executor { + | parallelism-factor = 2.0 + | } | } | } | http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd9a1ba4/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index eb0d79c..6dbcb67 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -92,7 +92,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala { instanceManager.addInstanceListener(scheduler) - log.info(s"Started job manager. Waiting for incoming messages.") + log.info("Started job manager. Waiting for incoming messages.") override def postStop(): Unit = { log.info(s"Stopping job manager ${self.path}.") @@ -128,7 +128,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala { " null.")) } else { - log.info(s"Received job ${jobGraph.getJobID} (${jobGraph.getName}}).") + log.info(s"Received job ${jobGraph.getJobID} (${jobGraph.getName}).") // Create the user code class loader libraryCacheManager.registerJob(jobGraph.getJobID, jobGraph.getUserJarBlobKeys) @@ -153,8 +153,10 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala { throw new JobException("The user code class loader could not be initialized.") } - log.debug(s"Running master initialization of job ${jobGraph.getJobID} (${jobGraph - .getName}).") + if(log.isDebugEnabled) { + log.debug(s"Running master initialization of job ${jobGraph.getJobID} (${jobGraph + .getName}}).") + } for (vertex <- jobGraph.getVertices) { val executableClass = vertex.getInvokableClassName @@ -169,13 +171,17 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala { // topological sorting of the job vertices val sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources - log.debug(s"Adding ${sortedTopology.size()} vertices from job graph ${jobGraph - .getJobID} (${jobGraph.getName}).") + if(log.isDebugEnabled) { + log.debug(s"Adding ${sortedTopology.size()} vertices from job graph ${jobGraph + .getJobID} (${jobGraph.getName}).") + } executionGraph.attachJobGraph(sortedTopology) - log.debug(s"Successfully created execution graph from job graph ${jobGraph.getJobID} " + - s"(${jobGraph.getName}).") + if(log.isDebugEnabled) { + log.debug(s"Successfully created execution graph from job graph ${jobGraph.getJobID} " + + s"(${jobGraph.getName}).") + } executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling) @@ -261,7 +267,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala { val execution = executionGraph.getRegisteredExecutions().get(executionAttempt) if(execution == null){ - log.error("Can not find Execution for attempt " + executionAttempt) + log.error(s"Can not find Execution for attempt ${executionAttempt}.") null }else{ val slot = execution.getAssignedResource @@ -289,7 +295,9 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala { null } - log.debug("Send next input split {}.", nextInputSplit) + if(log.isDebugEnabled) { + log.debug(s"Send next input split ${nextInputSplit}.") + } sender() ! NextInputSplit(nextInputSplit) } @@ -297,7 +305,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala { currentJobs.get(jobID) match { case Some((executionGraph, jobInfo)) => executionGraph.getJobName log.info(s"Status of job ${jobID} (${executionGraph.getJobName}) changed to " + - s"${newJobStatus}${optionalMessage}.") + s"${newJobStatus}${if(optionalMessage == null) "" else optionalMessage}.") if(newJobStatus.isTerminalState) { jobInfo.end = timeStamp http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd9a1ba4/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index b1cfda7..80af0e9 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -311,7 +311,9 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka libraryCacheManager.unregisterTask(jobID, executionID) } catch { case ioe: IOException => - log.debug(s"Unregistering the execution ${executionID} caused an IOException.") + if(log.isDebugEnabled) { + log.debug(s"Unregistering the execution ${executionID} caused an IOException.") + } } } @@ -326,11 +328,17 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka case LogMemoryUsage => { memoryMXBean foreach { - mxbean => log.debug(TaskManager.getMemoryUsageStatsAsString(mxbean)) + mxbean => + if(log.isDebugEnabled) { + log.debug(TaskManager.getMemoryUsageStatsAsString(mxbean)) + } } gcMXBeans foreach { - mxbeans => log.debug(TaskManager.getGarbageCollectorStatsAsString(mxbeans)) + mxbeans => + if(log.isDebugEnabled) { + log.debug(TaskManager.getGarbageCollectorStatsAsString(mxbeans)) + } } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd9a1ba4/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala index 3c11e1f..5a51265 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala @@ -37,14 +37,6 @@ class TestingCluster(userConfiguration: Configuration) extends FlinkMiniCluster( cfg } - override def getJobManagerAkkaConfigString(): String = { - super.getJobManagerAkkaConfigString() + TestingUtils.getTestingSerializationBindings - } - - override def getTaskManagerAkkaConfigString(index: Int): String = { - super.getTaskManagerAkkaConfigString(index) + TestingUtils.getTestingSerializationBindings - } - override def startJobManager(implicit system: ActorSystem) = { system.actorOf(Props(new JobManager(configuration) with TestingJobManager), JobManager.JOB_MANAGER_NAME) http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd9a1ba4/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java ---------------------------------------------------------------------- diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java index c30d976..af92dbb 100644 --- a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java +++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java @@ -102,6 +102,7 @@ public abstract class AbstractTestBase { config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, TASK_MANAGER_MEMORY_SIZE); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, taskManagerNumSlots); config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers); + config.setInteger(ConfigConstants.AKKA_ASK_TIMEOUT, 1000000); this.executor = new ForkableFlinkMiniCluster(config); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd9a1ba4/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala index cf85533..8761ec6 100644 --- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala +++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala @@ -52,10 +52,6 @@ LocalFlinkMiniCluster(userConfiguration) { super.generateConfiguration(config) } - override def getTaskManagerAkkaConfigString(index: Int): String = { - super.getTaskManagerAkkaConfigString(index) + TestingUtils.getTestingSerializationBindings - } - override def startTaskManager(index: Int)(implicit system: ActorSystem): ActorRef = { val config = configuration.clone() http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd9a1ba4/flink-tests/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/resources/log4j-test.properties b/flink-tests/src/test/resources/log4j-test.properties index 2c2d4ff..0b686e5 100644 --- a/flink-tests/src/test/resources/log4j-test.properties +++ b/flink-tests/src/test/resources/log4j-test.properties @@ -17,7 +17,7 @@ ################################################################################ # Set root logger level to DEBUG and its only appender to A1. -log4j.rootLogger=DEBUG, A1 +log4j.rootLogger=OFF, A1 # A1 is set to be a ConsoleAppender. log4j.appender.A1=org.apache.log4j.ConsoleAppender http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd9a1ba4/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 6739948..3cbfaa7 100644 --- a/pom.xml +++ b/pom.xml @@ -81,7 +81,7 @@ under the License. <slf4j.version>1.7.7</slf4j.version> <guava.version>17.0</guava.version> <scala.version>2.10.4</scala.version> - <akka.version>2.3.6</akka.version> + <akka.version>2.3.7</akka.version> <scala.binary.version>2.10</scala.binary.version> <scala.macros.version>2.0.1</scala.macros.version> </properties> @@ -644,11 +644,11 @@ under the License. <exclude>**/resources/**/font-awesome/**</exclude> <exclude>**/resources/**/jquery*</exclude> <exclude>**/resources/**/bootstrap*</exclude> - <exclude>flink-clients/resources/web-docs/js/*d3.js</exclude> - <exclude>flink-runtime/resources/web-docs-infoserver/css/sb-admin.css</exclude> - <exclude>flink-runtime/resources/web-docs-infoserver/js/flot/*</exclude> - <exclude>flink-runtime/resources/web-docs-infoserver/js/jcanvas.min.js</exclude> - <exclude>flink-runtime/resources/web-docs-infoserver/js/timeline.js</exclude> + <exclude>flink-clients/src/main/resources/web-docs/js/*d3.js</exclude> + <exclude>flink-runtime/src/main/resources/web-docs-infoserver/css/sb-admin.css</exclude> + <exclude>flink-runtime/src/main/resources/web-docs-infoserver/js/flot/*</exclude> + <exclude>flink-runtime/src/main/resources/web-docs-infoserver/js/jcanvas.min.js</exclude> + <exclude>flink-runtime/src/main/resources/web-docs-infoserver/js/timeline.js</exclude> <!-- Test Data. --> <exclude>flink-tests/src/test/resources/testdata/terainput.txt</exclude> <exclude>flink-addons/flink-avro/src/test/resources/avro/user.avsc</exclude>
