This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 41769a0  [FLINK-12057] Refactor MemoryLogger to accept termination 
future instead of ActorSystem
41769a0 is described below

commit 41769a0e2d8f8e336685641ee2f3639b00777839
Author: Chesnay Schepler <ches...@apache.org>
AuthorDate: Thu Mar 28 15:20:25 2019 +0100

    [FLINK-12057] Refactor MemoryLogger to accept termination future instead of 
ActorSystem
---
 .../runtime/taskexecutor/TaskManagerRunner.java      |  2 +-
 .../flink/runtime/taskmanager/MemoryLogger.java      | 20 ++++++++++----------
 .../flink/runtime/taskmanager/TaskManager.scala      |  2 --
 3 files changed, 11 insertions(+), 13 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index c8a8a1f..889221b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -159,7 +159,7 @@ public class TaskManagerRunner implements 
FatalErrorHandler, AutoCloseableAsync
                this.terminationFuture = new CompletableFuture<>();
                this.shutdown = false;
 
-               MemoryLogger.startIfConfigured(LOG, configuration, 
metricQueryServiceActorSystem);
+               MemoryLogger.startIfConfigured(LOG, configuration, 
terminationFuture);
        }
 
        // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java
index 91849d7..41b5985 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.taskmanager;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 
-import akka.actor.ActorSystem;
 import org.slf4j.Logger;
 
 import javax.management.MBeanServer;
@@ -34,6 +33,7 @@ import java.lang.management.MemoryPoolMXBean;
 import java.lang.management.MemoryType;
 import java.lang.management.MemoryUsage;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * A thread the periodically logs statistics about:
@@ -57,14 +57,14 @@ public class MemoryLogger extends Thread {
 
        private final BufferPoolMXBean directBufferBean;
        
-       private final ActorSystem monitored;
+       private final CompletableFuture<Void> monitored;
        
        private volatile boolean running = true;
 
        public static void startIfConfigured(
                        Logger logger,
                        Configuration configuration,
-                       ActorSystem taskManagerSystem) {
+                       CompletableFuture<Void> taskManagerTerminationFuture) {
                if (!logger.isInfoEnabled() || 
!configuration.getBoolean(TaskManagerOptions.DEBUG_MEMORY_LOG)) {
                        return;
                }
@@ -73,19 +73,19 @@ public class MemoryLogger extends Thread {
                new MemoryLogger(
                        logger,
                        
configuration.getLong(TaskManagerOptions.DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS),
-                       taskManagerSystem).start();
+                       taskManagerTerminationFuture).start();
        }
        
        /**
-        * Creates a new memory logger that logs in the given interval and 
lives as long as the
-        * given actor system.
+        * Creates a new memory logger that logs in the given interval and 
lives until the
+        * given termination future completes.
         *
         * @param logger The logger to use for outputting the memory statistics.
         * @param interval The interval in which the thread logs.
-        * @param monitored The actor system to whose life the thread is bound. 
The thread terminates
-        *                  once the actor system terminates.
+        * @param monitored termination future for the system to whose life the 
thread is bound. The thread terminates
+        *                  once the system terminates.
         */
-       public MemoryLogger(Logger logger, long interval, ActorSystem 
monitored) {
+       public MemoryLogger(Logger logger, long interval, 
CompletableFuture<Void> monitored) {
                super("Memory Logger");
                setDaemon(true);
                setPriority(Thread.MIN_PRIORITY);
@@ -125,7 +125,7 @@ public class MemoryLogger extends Thread {
        @Override
        public void run() {
                try {
-                       while (running && (monitored == null || 
!monitored.whenTerminated().isCompleted())) {
+                       while (running && (monitored == null || 
!monitored.isDone())) {
                                
logger.info(getMemoryUsageStatsAsString(memoryBean));
                                
logger.info(getDirectMemoryStatsAsString(directBufferBean));
                                
logger.info(getMemoryPoolStatsAsString(poolBeans));
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 02f1753..35e6c28 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1885,8 +1885,6 @@ object TaskManager {
         )
       }
 
-      MemoryLogger.startIfConfigured(LOG.logger, configuration, 
taskManagerSystem)
-
       // block until everything is done
       Await.ready(taskManagerSystem.whenTerminated, Duration.Inf)
     } catch {

Reply via email to