This is an automated email from the ASF dual-hosted git repository.

guoyangze pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new da532423487 [FLINK-27256][runtime] Log the root exception in closing 
the task manager connection
da532423487 is described below

commit da532423487e0534b5fe61f5a02366833f76193a
Author: Yangze Guo <karma...@gmail.com>
AuthorDate: Fri Apr 15 10:27:27 2022 +0800

    [FLINK-27256][runtime] Log the root exception in closing the task manager 
connection
    
    This closes #19481.
---
 .../java/org/apache/flink/util/ExceptionUtils.java | 24 +++++++++++++++
 .../apache/flink/util/FlinkExpectedException.java  | 36 ++++++++++++++++++++++
 .../apache/flink/runtime/jobmaster/JobMaster.java  |  6 ++--
 .../runtime/resourcemanager/ResourceManager.java   |  4 ++-
 .../slotmanager/FineGrainedSlotManager.java        |  8 +++--
 .../slotmanager/TaskExecutorManager.java           |  8 +++--
 .../flink/runtime/taskexecutor/TaskExecutor.java   | 28 +++++++++--------
 7 files changed, 92 insertions(+), 22 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index c6fbd4f0230..96443d19e7f 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -29,6 +29,8 @@ package org.apache.flink.util;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.util.function.RunnableWithException;
 
+import org.slf4j.Logger;
+
 import javax.annotation.Nullable;
 
 import java.io.IOException;
@@ -678,6 +680,28 @@ public final class ExceptionUtils {
         }
     }
 
+    /**
+     * Return the given exception if it is not a {@link 
FlinkExpectedException}.
+     *
+     * @param e the given exception
+     * @return the given exception if it is not a {@link 
FlinkExpectedException}
+     */
+    public static Throwable returnExceptionIfUnexpected(Throwable e) {
+        return e instanceof FlinkExpectedException ? null : e;
+    }
+
+    /**
+     * Log the given exception in debug level if it is a {@link 
FlinkExpectedException}.
+     *
+     * @param e the given exception
+     * @param log logger
+     */
+    public static void logExceptionIfExcepted(Throwable e, Logger log) {
+        if (e instanceof FlinkExpectedException) {
+            log.debug("Expected exception.", e);
+        }
+    }
+
     // ------------------------------------------------------------------------
     //  Lambda exception utilities
     // ------------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/FlinkExpectedException.java 
b/flink-core/src/main/java/org/apache/flink/util/FlinkExpectedException.java
new file mode 100644
index 00000000000..ef22026f158
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/FlinkExpectedException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+/** This class is just used to pass diagnostic message of some excepted 
procedure. */
+public class FlinkExpectedException extends Exception {
+    private static final long serialVersionUID = 1L;
+
+    public FlinkExpectedException(String message) {
+        super(message);
+    }
+
+    public FlinkExpectedException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public FlinkExpectedException(Throwable cause) {
+        super(cause);
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index af8997465e3..1884d33230c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -496,10 +496,12 @@ public class JobMaster extends 
PermanentlyFencedRpcEndpoint<JobMasterId>
     @Override
     public CompletableFuture<Acknowledge> disconnectTaskManager(
             final ResourceID resourceID, final Exception cause) {
-        log.debug(
+        log.info(
                 "Disconnect TaskExecutor {} because: {}",
                 resourceID.getStringWithMetadata(),
-                cause.getMessage());
+                cause.getMessage(),
+                ExceptionUtils.returnExceptionIfUnexpected(cause.getCause()));
+        ExceptionUtils.logExceptionIfExcepted(cause.getCause(), log);
 
         taskManagerHeartbeatManager.unmonitorTarget(resourceID);
         slotPoolService.releaseTaskManager(resourceID, cause);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index f4834efbc3d..3cd99ca0cf7 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -1014,7 +1014,9 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
             log.info(
                     "Closing TaskExecutor connection {} because: {}",
                     resourceID.getStringWithMetadata(),
-                    cause.getMessage());
+                    cause.getMessage(),
+                    
ExceptionUtils.returnExceptionIfUnexpected(cause.getCause()));
+            ExceptionUtils.logExceptionIfExcepted(cause.getCause(), log);
 
             // TODO :: suggest failed task executor to stop itself
             
slotManager.unregisterTaskManager(workerRegistration.getInstanceID(), cause);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
index 8872d8c3bab..f3d0b3fcb79 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
@@ -37,7 +37,7 @@ import org.apache.flink.runtime.slots.ResourceRequirement;
 import org.apache.flink.runtime.slots.ResourceRequirements;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.util.ResourceCounter;
-import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.FlinkExpectedException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.concurrent.ScheduledExecutor;
@@ -332,7 +332,8 @@ public class FineGrainedSlotManager implements SlotManager {
                         maxTotalMem.toHumanReadableString());
                 resourceActions.releaseResource(
                         taskExecutorConnection.getInstanceID(),
-                        new FlinkException("The max total resource limitation 
is reached."));
+                        new FlinkExpectedException(
+                                "The max total resource limitation is 
reached."));
                 return false;
             }
 
@@ -707,7 +708,8 @@ public class FineGrainedSlotManager implements SlotManager {
     }
 
     private void releaseIdleTaskExecutor(InstanceID timedOutTaskManagerId) {
-        final FlinkException cause = new FlinkException("TaskManager exceeded 
the idle timeout.");
+        final FlinkExpectedException cause =
+                new FlinkExpectedException("TaskManager exceeded the idle 
timeout.");
         resourceActions.releaseResource(timedOutTaskManagerId, cause);
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java
index ebe61ace658..1df5a872ee4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java
@@ -27,7 +27,7 @@ import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnect
 import org.apache.flink.runtime.slots.ResourceRequirement;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
-import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.FlinkExpectedException;
 import org.apache.flink.util.MathUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.concurrent.ScheduledExecutor;
@@ -159,7 +159,8 @@ class TaskExecutorManager implements AutoCloseable {
                     maxSlotNum);
             resourceActions.releaseResource(
                     taskExecutorConnection.getInstanceID(),
-                    new FlinkException("The total number of slots exceeds the 
max limitation."));
+                    new FlinkExpectedException(
+                            "The total number of slots exceeds the max 
limitation."));
             return false;
         }
 
@@ -400,7 +401,8 @@ class TaskExecutorManager implements AutoCloseable {
     }
 
     private void releaseIdleTaskExecutor(InstanceID timedOutTaskManagerId) {
-        final FlinkException cause = new FlinkException("TaskExecutor exceeded 
the idle timeout.");
+        final FlinkExpectedException cause =
+                new FlinkExpectedException("TaskExecutor exceeded the idle 
timeout.");
         LOG.debug(
                 "Release TaskExecutor {} because it exceeded the idle 
timeout.",
                 timedOutTaskManagerId);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index b7e6ee28578..10b5b6eae61 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -132,6 +132,7 @@ import 
org.apache.flink.runtime.webmonitor.threadinfo.ThreadInfoSamplesRequest;
 import org.apache.flink.types.SerializableOptional;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.FlinkExpectedException;
 import org.apache.flink.util.OptionalConsumer;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
@@ -453,7 +454,8 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
 
         Throwable jobManagerDisconnectThrowable = null;
 
-        FlinkException cause = new FlinkException("The TaskExecutor is 
shutting down.");
+        FlinkExpectedException cause =
+                new FlinkExpectedException("The TaskExecutor is shutting 
down.");
 
         closeResourceManagerConnection(cause);
 
@@ -1414,11 +1416,11 @@ public class TaskExecutor extends RpcEndpoint 
implements TaskExecutorGateway {
             final ResourceID resourceManagerResourceId =
                     
establishedResourceManagerConnection.getResourceManagerResourceId();
 
-            if (log.isDebugEnabled()) {
-                log.debug("Close ResourceManager connection {}.", 
resourceManagerResourceId, cause);
-            } else {
-                log.info("Close ResourceManager connection {}.", 
resourceManagerResourceId);
-            }
+            log.info(
+                    "Close ResourceManager connection {}.",
+                    resourceManagerResourceId,
+                    
ExceptionUtils.returnExceptionIfUnexpected(cause.getCause()));
+            ExceptionUtils.logExceptionIfExcepted(cause.getCause(), log);
             
resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerResourceId);
 
             ResourceManagerGateway resourceManagerGateway =
@@ -1665,11 +1667,11 @@ public class TaskExecutor extends RpcEndpoint 
implements TaskExecutorGateway {
     private void disconnectJobManagerConnection(
             JobTable.Connection jobManagerConnection, Exception cause) {
         final JobID jobId = jobManagerConnection.getJobId();
-        if (log.isDebugEnabled()) {
-            log.debug("Close JobManager connection for job {}.", jobId, cause);
-        } else {
-            log.info("Close JobManager connection for job {}.", jobId);
-        }
+        log.info(
+                "Close JobManager connection for job {}.",
+                jobId,
+                ExceptionUtils.returnExceptionIfUnexpected(cause.getCause()));
+        ExceptionUtils.logExceptionIfExcepted(cause.getCause(), log);
 
         // 1. fail tasks running under this JobID
         Iterator<Task> tasks = taskSlotTable.getTasks(jobId);
@@ -1970,8 +1972,8 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                 && !partitionTracker.isTrackingPartitionsFor(jobId)) {
             // we can remove the job from the job leader service
 
-            final FlinkException cause =
-                    new FlinkException(
+            final FlinkExpectedException cause =
+                    new FlinkExpectedException(
                             "TaskExecutor "
                                     + getAddress()
                                     + " has no more allocated slots for job "

Reply via email to