[hotfix] Improve logging of SlotPool and SlotSharingManager
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f75dacca Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f75dacca Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f75dacca Branch: refs/heads/release-1.5 Commit: f75dacca2ebf16936eba85c7434a00bd0c2c5d50 Parents: e8b70cc Author: Till Rohrmann <trohrm...@apache.org> Authored: Thu Jul 19 13:41:03 2018 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Mon Jul 23 17:22:52 2018 +0200 ---------------------------------------------------------------------- .../clusterframework/types/AllocationID.java | 5 +++ .../flink/runtime/jobmaster/SlotRequestId.java | 5 +++ .../runtime/jobmaster/slotpool/SlotPool.java | 36 ++++++++++---------- .../jobmaster/slotpool/SlotSharingManager.java | 21 ++++++++++-- 4 files changed, 46 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f75dacca/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/AllocationID.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/AllocationID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/AllocationID.java index e722e9f..7004eff 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/AllocationID.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/AllocationID.java @@ -52,4 +52,9 @@ public class AllocationID extends AbstractID { public AllocationID(long lowerPart, long upperPart) { super(lowerPart, upperPart); } + + @Override + public String toString() { + return "AllocationID{" + super.toString() + '}'; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/f75dacca/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotRequestId.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotRequestId.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotRequestId.java index 203139c..5ac200d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotRequestId.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotRequestId.java @@ -40,4 +40,9 @@ public final class SlotRequestId extends AbstractID { } public SlotRequestId() {} + + @Override + public String toString() { + return "SlotRequestId{" + super.toString() + '}'; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/f75dacca/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java index 829c82e..13f0462 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java @@ -323,7 +323,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS boolean allowQueuedScheduling, Time allocationTimeout) { - log.debug("Allocating slot with request {} for task execution {}", slotRequestId, task.getTaskToExecute()); + log.debug("Received slot request [{}] for task: {}", slotRequestId, task.getTaskToExecute()); final SlotSharingGroupId slotSharingGroupId = task.getSlotSharingGroupId(); @@ -686,7 +686,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS checkNotNull(resourceManagerGateway); checkNotNull(pendingRequest); - log.info("Requesting slot with profile {} from resource manager (request = {}).", pendingRequest.getResourceProfile(), pendingRequest.getSlotRequestId()); + log.info("Requesting new slot [{}] and profile {} from resource manager.", pendingRequest.getSlotRequestId(), pendingRequest.getResourceProfile()); final AllocationID allocationId = new AllocationID(); @@ -723,7 +723,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS "No pooled slot available and request to ResourceManager for new slot failed", failure)); } else { if (log.isDebugEnabled()) { - log.debug("Unregistered slot request {} failed.", slotRequestID, failure); + log.debug("Unregistered slot request [{}] failed.", slotRequestID, failure); } } } @@ -731,7 +731,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS private void stashRequestWaitingForResourceManager(final PendingRequest pendingRequest) { log.info("Cannot serve slot request, no ResourceManager connected. " + - "Adding as pending request {}", pendingRequest.getSlotRequestId()); + "Adding as pending request [{}]", pendingRequest.getSlotRequestId()); waitingForResourceManager.put(pendingRequest.getSlotRequestId(), pendingRequest); } @@ -742,7 +742,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS @Override public CompletableFuture<Acknowledge> releaseSlot(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable cause) { - log.debug("Releasing slot with slot request id {} because of {}.", slotRequestId, cause != null ? cause.getMessage() : "null"); + log.debug("Releasing slot [{}] because: {}", slotRequestId, cause != null ? cause.getMessage() : "null"); if (slotSharingGroupId != null) { final SlotSharingManager multiTaskSlotManager = slotSharingManagers.get(slotSharingGroupId); @@ -753,7 +753,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS if (taskSlot != null) { taskSlot.release(cause); } else { - log.debug("Could not find slot {} in slot sharing group {}. Ignoring release slot request.", slotRequestId, slotSharingGroupId); + log.debug("Could not find slot [{}] in slot sharing group {}. Ignoring release slot request.", slotRequestId, slotSharingGroupId); } } else { log.debug("Could not find slot sharing group {}. Ignoring release slot request.", slotSharingGroupId); @@ -770,7 +770,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS allocatedSlot.releasePayload(cause); tryFulfillSlotRequestOrMakeAvailable(allocatedSlot); } else { - log.debug("There is no allocated slot with slot request id {}. Ignoring the release slot request.", slotRequestId); + log.debug("There is no allocated slot [{}]. Ignoring the release slot request.", slotRequestId); } } } @@ -801,11 +801,11 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS } private void failPendingRequest(PendingRequest pendingRequest, Exception e) { - Preconditions.checkNotNull(pendingRequest); - Preconditions.checkNotNull(e); + checkNotNull(pendingRequest); + checkNotNull(e); if (!pendingRequest.getAllocatedSlotFuture().isDone()) { - log.info("Failing pending request {}.", pendingRequest.getSlotRequestId()); + log.info("Failing pending slot request [{}]: {}", pendingRequest.getSlotRequestId(), e.getMessage()); pendingRequest.getAllocatedSlotFuture().completeExceptionally(e); } } @@ -833,7 +833,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS final PendingRequest pendingRequest = pollMatchingPendingRequest(allocatedSlot); if (pendingRequest != null) { - log.debug("Fulfilling pending request [{}] early with returned slot [{}]", + log.debug("Fulfilling pending slot request [{}] early with returned slot [{}]", pendingRequest.getSlotRequestId(), allocatedSlot.getAllocationId()); allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot); @@ -970,7 +970,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS allocatedSlots.remove(pendingRequest.getSlotRequestId()); tryFulfillSlotRequestOrMakeAvailable(allocatedSlot); } else { - log.debug("Fulfilled slot request {} with allocated slot {}.", pendingRequest.getSlotRequestId(), allocationID); + log.debug("Fulfilled slot request [{}] with allocated slot [{}].", pendingRequest.getSlotRequestId(), allocationID); } } else { @@ -1011,7 +1011,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS failPendingRequest(pendingRequest, cause); } else if (availableSlots.tryRemove(allocationID)) { - log.debug("Failed available slot with allocation id {}.", allocationID, cause); + log.debug("Failed available slot [{}].", allocationID, cause); } else { AllocatedSlot allocatedSlot = allocatedSlots.remove(allocationID); @@ -1021,7 +1021,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS allocatedSlot.releasePayload(cause); } else { - log.trace("Outdated request to fail slot with allocation id {}.", allocationID, cause); + log.trace("Outdated request to fail slot [{}].", allocationID, cause); } } // TODO: add some unit tests when the previous two are ready, the allocation may failed at any phase @@ -1068,7 +1068,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS @VisibleForTesting protected void timeoutPendingSlotRequest(SlotRequestId slotRequestId) { - log.info("Pending slot request {} timed out.", slotRequestId); + log.info("Pending slot request [{}] timed out.", slotRequestId); removePendingRequest(slotRequestId); } @@ -1109,7 +1109,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS final AllocationID allocationID = expiredSlot.getAllocationId(); if (availableSlots.tryRemove(allocationID)) { - log.info("Releasing idle slot {}.", allocationID); + log.info("Releasing idle slot [{}].", allocationID); final CompletableFuture<Acknowledge> freeSlotFuture = expiredSlot.getTaskManagerGateway().freeSlot( allocationID, cause, @@ -1119,12 +1119,12 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS (Acknowledge ignored, Throwable throwable) -> { if (throwable != null) { if (registeredTaskManagers.contains(expiredSlot.getTaskManagerId())) { - log.debug("Releasing slot {} of registered TaskExecutor {} failed. " + + log.debug("Releasing slot [{}] of registered TaskExecutor {} failed. " + "Trying to fulfill a different slot request.", allocationID, expiredSlot.getTaskManagerId(), throwable); tryFulfillSlotRequestOrMakeAvailable(expiredSlot); } else { - log.debug("Releasing slot {} failed and owning TaskExecutor {} is no " + + log.debug("Releasing slot [{}] failed and owning TaskExecutor {} is no " + "longer registered. Discarding slot.", allocationID, expiredSlot.getTaskManagerId()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/f75dacca/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java index eaa5787..afcd24f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java @@ -32,6 +32,9 @@ import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; @@ -77,6 +80,8 @@ import java.util.function.Function; */ public class SlotSharingManager { + private static final Logger LOG = LoggerFactory.getLogger(SlotSharingManager.class); + /** Lock for the internal data structures. */ private final Object lock = new Object(); @@ -143,6 +148,8 @@ public class SlotSharingManager { slotContextFuture, allocatedSlotRequestId); + LOG.debug("Create multi task slot [{}] in slot [{}].", slotRequestId, allocatedSlotRequestId); + allTaskSlots.put(slotRequestId, rootMultiTaskSlot); synchronized (lock) { @@ -158,6 +165,8 @@ public class SlotSharingManager { final MultiTaskSlot resolvedRootNode = unresolvedRootSlots.remove(slotRequestId); if (resolvedRootNode != null) { + LOG.trace("Fulfill multi task slot [{}] with slot [{}].", slotRequestId, slotContext.getAllocationId()); + final Set<MultiTaskSlot> innerCollection = resolvedRootSlots.computeIfAbsent( slotContext.getTaskManagerLocation(), taskManagerLocation -> new HashSet<>(4)); @@ -384,6 +393,8 @@ public class SlotSharingManager { MultiTaskSlot allocateMultiTaskSlot(SlotRequestId slotRequestId, AbstractID groupId) { Preconditions.checkState(!super.contains(groupId)); + LOG.debug("Create nested multi task slot [{}] in parent multi task slot [{}] for group [{}].", slotRequestId, getSlotRequestId(), groupId); + final MultiTaskSlot inner = new MultiTaskSlot( slotRequestId, groupId, @@ -412,6 +423,8 @@ public class SlotSharingManager { Locality locality) { Preconditions.checkState(!super.contains(groupId)); + LOG.debug("Create single task slot [{}] in multi task slot [{}] for group {}.", slotRequestId, getSlotRequestId(), groupId); + final SingleTaskSlot leaf = new SingleTaskSlot( slotRequestId, groupId, @@ -557,13 +570,15 @@ public class SlotSharingManager { Preconditions.checkNotNull(locality); singleLogicalSlotFuture = parent.getSlotContextFuture() .thenApply( - (SlotContext slotContext) -> - new SingleLogicalSlot( + (SlotContext slotContext) -> { + LOG.trace("Fulfill single task slot [{}] with slot [{}].", slotRequestId, slotContext.getAllocationId()); + return new SingleLogicalSlot( slotRequestId, slotContext, slotSharingGroupId, locality, - slotOwner)); + slotOwner); + }); } CompletableFuture<LogicalSlot> getLogicalSlotFuture() {