[hotfix] Improve logging of SlotPool and SlotSharingManager

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f75dacca
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f75dacca
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f75dacca

Branch: refs/heads/release-1.5
Commit: f75dacca2ebf16936eba85c7434a00bd0c2c5d50
Parents: e8b70cc
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Thu Jul 19 13:41:03 2018 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Mon Jul 23 17:22:52 2018 +0200

----------------------------------------------------------------------
 .../clusterframework/types/AllocationID.java    |  5 +++
 .../flink/runtime/jobmaster/SlotRequestId.java  |  5 +++
 .../runtime/jobmaster/slotpool/SlotPool.java    | 36 ++++++++++----------
 .../jobmaster/slotpool/SlotSharingManager.java  | 21 ++++++++++--
 4 files changed, 46 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f75dacca/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/AllocationID.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/AllocationID.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/AllocationID.java
index e722e9f..7004eff 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/AllocationID.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/AllocationID.java
@@ -52,4 +52,9 @@ public class AllocationID extends AbstractID {
        public AllocationID(long lowerPart, long upperPart) {
                super(lowerPart, upperPart);
        }
+
+       @Override
+       public String toString() {
+               return "AllocationID{" + super.toString() + '}';
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f75dacca/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotRequestId.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotRequestId.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotRequestId.java
index 203139c..5ac200d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotRequestId.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotRequestId.java
@@ -40,4 +40,9 @@ public final class SlotRequestId extends AbstractID {
        }
 
        public SlotRequestId() {}
+
+       @Override
+       public String toString() {
+               return "SlotRequestId{" + super.toString() + '}';
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f75dacca/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index 829c82e..13f0462 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -323,7 +323,7 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
                        boolean allowQueuedScheduling,
                        Time allocationTimeout) {
 
-               log.debug("Allocating slot with request {} for task execution 
{}", slotRequestId, task.getTaskToExecute());
+               log.debug("Received slot request [{}] for task: {}", 
slotRequestId, task.getTaskToExecute());
 
                final SlotSharingGroupId slotSharingGroupId = 
task.getSlotSharingGroupId();
 
@@ -686,7 +686,7 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
                checkNotNull(resourceManagerGateway);
                checkNotNull(pendingRequest);
 
-               log.info("Requesting slot with profile {} from resource manager 
(request = {}).", pendingRequest.getResourceProfile(), 
pendingRequest.getSlotRequestId());
+               log.info("Requesting new slot [{}] and profile {} from resource 
manager.", pendingRequest.getSlotRequestId(), 
pendingRequest.getResourceProfile());
 
                final AllocationID allocationId = new AllocationID();
 
@@ -723,7 +723,7 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
                                        "No pooled slot available and request 
to ResourceManager for new slot failed", failure));
                } else {
                        if (log.isDebugEnabled()) {
-                               log.debug("Unregistered slot request {} 
failed.", slotRequestID, failure);
+                               log.debug("Unregistered slot request [{}] 
failed.", slotRequestID, failure);
                        }
                }
        }
@@ -731,7 +731,7 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
        private void stashRequestWaitingForResourceManager(final PendingRequest 
pendingRequest) {
 
                log.info("Cannot serve slot request, no ResourceManager 
connected. " +
-                               "Adding as pending request {}",  
pendingRequest.getSlotRequestId());
+                               "Adding as pending request [{}]",  
pendingRequest.getSlotRequestId());
 
                
waitingForResourceManager.put(pendingRequest.getSlotRequestId(), 
pendingRequest);
        }
@@ -742,7 +742,7 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
 
        @Override
        public CompletableFuture<Acknowledge> releaseSlot(SlotRequestId 
slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable 
cause) {
-               log.debug("Releasing slot with slot request id {} because of 
{}.", slotRequestId, cause != null ? cause.getMessage() : "null");
+               log.debug("Releasing slot [{}] because: {}", slotRequestId, 
cause != null ? cause.getMessage() : "null");
 
                if (slotSharingGroupId != null) {
                        final SlotSharingManager multiTaskSlotManager = 
slotSharingManagers.get(slotSharingGroupId);
@@ -753,7 +753,7 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
                                if (taskSlot != null) {
                                        taskSlot.release(cause);
                                } else {
-                                       log.debug("Could not find slot {} in 
slot sharing group {}. Ignoring release slot request.", slotRequestId, 
slotSharingGroupId);
+                                       log.debug("Could not find slot [{}] in 
slot sharing group {}. Ignoring release slot request.", slotRequestId, 
slotSharingGroupId);
                                }
                        } else {
                                log.debug("Could not find slot sharing group 
{}. Ignoring release slot request.", slotSharingGroupId);
@@ -770,7 +770,7 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
                                        allocatedSlot.releasePayload(cause);
                                        
tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
                                } else {
-                                       log.debug("There is no allocated slot 
with slot request id {}. Ignoring the release slot request.", slotRequestId);
+                                       log.debug("There is no allocated slot 
[{}]. Ignoring the release slot request.", slotRequestId);
                                }
                        }
                }
@@ -801,11 +801,11 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
        }
 
        private void failPendingRequest(PendingRequest pendingRequest, 
Exception e) {
-               Preconditions.checkNotNull(pendingRequest);
-               Preconditions.checkNotNull(e);
+               checkNotNull(pendingRequest);
+               checkNotNull(e);
 
                if (!pendingRequest.getAllocatedSlotFuture().isDone()) {
-                       log.info("Failing pending request {}.", 
pendingRequest.getSlotRequestId());
+                       log.info("Failing pending slot request [{}]: {}", 
pendingRequest.getSlotRequestId(), e.getMessage());
                        
pendingRequest.getAllocatedSlotFuture().completeExceptionally(e);
                }
        }
@@ -833,7 +833,7 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
                final PendingRequest pendingRequest = 
pollMatchingPendingRequest(allocatedSlot);
 
                if (pendingRequest != null) {
-                       log.debug("Fulfilling pending request [{}] early with 
returned slot [{}]",
+                       log.debug("Fulfilling pending slot request [{}] early 
with returned slot [{}]",
                                pendingRequest.getSlotRequestId(), 
allocatedSlot.getAllocationId());
 
                        allocatedSlots.add(pendingRequest.getSlotRequestId(), 
allocatedSlot);
@@ -970,7 +970,7 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
                                
allocatedSlots.remove(pendingRequest.getSlotRequestId());
                                
tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
                        } else {
-                               log.debug("Fulfilled slot request {} with 
allocated slot {}.", pendingRequest.getSlotRequestId(), allocationID);
+                               log.debug("Fulfilled slot request [{}] with 
allocated slot [{}].", pendingRequest.getSlotRequestId(), allocationID);
                        }
                }
                else {
@@ -1011,7 +1011,7 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
                        failPendingRequest(pendingRequest, cause);
                }
                else if (availableSlots.tryRemove(allocationID)) {
-                       log.debug("Failed available slot with allocation id 
{}.", allocationID, cause);
+                       log.debug("Failed available slot [{}].", allocationID, 
cause);
                }
                else {
                        AllocatedSlot allocatedSlot = 
allocatedSlots.remove(allocationID);
@@ -1021,7 +1021,7 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
                                allocatedSlot.releasePayload(cause);
                        }
                        else {
-                               log.trace("Outdated request to fail slot with 
allocation id {}.", allocationID, cause);
+                               log.trace("Outdated request to fail slot 
[{}].", allocationID, cause);
                        }
                }
                // TODO: add some unit tests when the previous two are ready, 
the allocation may failed at any phase
@@ -1068,7 +1068,7 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
 
        @VisibleForTesting
        protected void timeoutPendingSlotRequest(SlotRequestId slotRequestId) {
-               log.info("Pending slot request {} timed out.", slotRequestId);
+               log.info("Pending slot request [{}] timed out.", slotRequestId);
                removePendingRequest(slotRequestId);
        }
 
@@ -1109,7 +1109,7 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
                        final AllocationID allocationID = 
expiredSlot.getAllocationId();
                        if (availableSlots.tryRemove(allocationID)) {
 
-                               log.info("Releasing idle slot {}.", 
allocationID);
+                               log.info("Releasing idle slot [{}].", 
allocationID);
                                final CompletableFuture<Acknowledge> 
freeSlotFuture = expiredSlot.getTaskManagerGateway().freeSlot(
                                        allocationID,
                                        cause,
@@ -1119,12 +1119,12 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
                                        (Acknowledge ignored, Throwable 
throwable) -> {
                                                if (throwable != null) {
                                                        if 
(registeredTaskManagers.contains(expiredSlot.getTaskManagerId())) {
-                                                               
log.debug("Releasing slot {} of registered TaskExecutor {} failed. " +
+                                                               
log.debug("Releasing slot [{}] of registered TaskExecutor {} failed. " +
                                                                        "Trying 
to fulfill a different slot request.", allocationID, 
expiredSlot.getTaskManagerId(),
                                                                        
throwable);
                                                                
tryFulfillSlotRequestOrMakeAvailable(expiredSlot);
                                                        } else {
-                                                               
log.debug("Releasing slot {} failed and owning TaskExecutor {} is no " +
+                                                               
log.debug("Releasing slot [{}] failed and owning TaskExecutor {} is no " +
                                                                        "longer 
registered. Discarding slot.", allocationID, expiredSlot.getTaskManagerId());
                                                        }
                                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/f75dacca/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
index eaa5787..afcd24f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
@@ -32,6 +32,9 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 
@@ -77,6 +80,8 @@ import java.util.function.Function;
  */
 public class SlotSharingManager {
 
+       private static final Logger LOG = 
LoggerFactory.getLogger(SlotSharingManager.class);
+
        /** Lock for the internal data structures. */
        private final Object lock = new Object();
 
@@ -143,6 +148,8 @@ public class SlotSharingManager {
                        slotContextFuture,
                        allocatedSlotRequestId);
 
+               LOG.debug("Create multi task slot [{}] in slot [{}].", 
slotRequestId, allocatedSlotRequestId);
+
                allTaskSlots.put(slotRequestId, rootMultiTaskSlot);
 
                synchronized (lock) {
@@ -158,6 +165,8 @@ public class SlotSharingManager {
                                                final MultiTaskSlot 
resolvedRootNode = unresolvedRootSlots.remove(slotRequestId);
 
                                                if (resolvedRootNode != null) {
+                                                       LOG.trace("Fulfill 
multi task slot [{}] with slot [{}].", slotRequestId, 
slotContext.getAllocationId());
+
                                                        final 
Set<MultiTaskSlot> innerCollection = resolvedRootSlots.computeIfAbsent(
                                                                
slotContext.getTaskManagerLocation(),
                                                                
taskManagerLocation -> new HashSet<>(4));
@@ -384,6 +393,8 @@ public class SlotSharingManager {
                MultiTaskSlot allocateMultiTaskSlot(SlotRequestId 
slotRequestId, AbstractID groupId) {
                        Preconditions.checkState(!super.contains(groupId));
 
+                       LOG.debug("Create nested multi task slot [{}] in parent 
multi task slot [{}] for group [{}].", slotRequestId, getSlotRequestId(), 
groupId);
+
                        final MultiTaskSlot inner = new MultiTaskSlot(
                                slotRequestId,
                                groupId,
@@ -412,6 +423,8 @@ public class SlotSharingManager {
                                Locality locality) {
                        Preconditions.checkState(!super.contains(groupId));
 
+                       LOG.debug("Create single task slot [{}] in multi task 
slot [{}] for group {}.", slotRequestId, getSlotRequestId(), groupId);
+
                        final SingleTaskSlot leaf = new SingleTaskSlot(
                                slotRequestId,
                                groupId,
@@ -557,13 +570,15 @@ public class SlotSharingManager {
                        Preconditions.checkNotNull(locality);
                        singleLogicalSlotFuture = parent.getSlotContextFuture()
                                .thenApply(
-                                       (SlotContext slotContext) ->
-                                               new SingleLogicalSlot(
+                                       (SlotContext slotContext) -> {
+                                               LOG.trace("Fulfill single task 
slot [{}] with slot [{}].", slotRequestId, slotContext.getAllocationId());
+                                               return new SingleLogicalSlot(
                                                        slotRequestId,
                                                        slotContext,
                                                        slotSharingGroupId,
                                                        locality,
-                                                       slotOwner));
+                                                       slotOwner);
+                                       });
                }
 
                CompletableFuture<LogicalSlot> getLogicalSlotFuture() {

Reply via email to