This is an automated email from the ASF dual-hosted git repository. guoyangze pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new da532423487 [FLINK-27256][runtime] Log the root exception in closing the task manager connection da532423487 is described below commit da532423487e0534b5fe61f5a02366833f76193a Author: Yangze Guo <karma...@gmail.com> AuthorDate: Fri Apr 15 10:27:27 2022 +0800 [FLINK-27256][runtime] Log the root exception in closing the task manager connection This closes #19481. --- .../java/org/apache/flink/util/ExceptionUtils.java | 24 +++++++++++++++ .../apache/flink/util/FlinkExpectedException.java | 36 ++++++++++++++++++++++ .../apache/flink/runtime/jobmaster/JobMaster.java | 6 ++-- .../runtime/resourcemanager/ResourceManager.java | 4 ++- .../slotmanager/FineGrainedSlotManager.java | 8 +++-- .../slotmanager/TaskExecutorManager.java | 8 +++-- .../flink/runtime/taskexecutor/TaskExecutor.java | 28 +++++++++-------- 7 files changed, 92 insertions(+), 22 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java index c6fbd4f0230..96443d19e7f 100644 --- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java @@ -29,6 +29,8 @@ package org.apache.flink.util; import org.apache.flink.annotation.Internal; import org.apache.flink.util.function.RunnableWithException; +import org.slf4j.Logger; + import javax.annotation.Nullable; import java.io.IOException; @@ -678,6 +680,28 @@ public final class ExceptionUtils { } } + /** + * Return the given exception if it is not a {@link FlinkExpectedException}. + * + * @param e the given exception + * @return the given exception if it is not a {@link FlinkExpectedException} + */ + public static Throwable returnExceptionIfUnexpected(Throwable e) { + return e instanceof FlinkExpectedException ? null : e; + } + + /** + * Log the given exception in debug level if it is a {@link FlinkExpectedException}. + * + * @param e the given exception + * @param log logger + */ + public static void logExceptionIfExcepted(Throwable e, Logger log) { + if (e instanceof FlinkExpectedException) { + log.debug("Expected exception.", e); + } + } + // ------------------------------------------------------------------------ // Lambda exception utilities // ------------------------------------------------------------------------ diff --git a/flink-core/src/main/java/org/apache/flink/util/FlinkExpectedException.java b/flink-core/src/main/java/org/apache/flink/util/FlinkExpectedException.java new file mode 100644 index 00000000000..ef22026f158 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/util/FlinkExpectedException.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +/** This class is just used to pass diagnostic message of some excepted procedure. */ +public class FlinkExpectedException extends Exception { + private static final long serialVersionUID = 1L; + + public FlinkExpectedException(String message) { + super(message); + } + + public FlinkExpectedException(String message, Throwable cause) { + super(message, cause); + } + + public FlinkExpectedException(Throwable cause) { + super(cause); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index af8997465e3..1884d33230c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -496,10 +496,12 @@ public class JobMaster extends PermanentlyFencedRpcEndpoint<JobMasterId> @Override public CompletableFuture<Acknowledge> disconnectTaskManager( final ResourceID resourceID, final Exception cause) { - log.debug( + log.info( "Disconnect TaskExecutor {} because: {}", resourceID.getStringWithMetadata(), - cause.getMessage()); + cause.getMessage(), + ExceptionUtils.returnExceptionIfUnexpected(cause.getCause())); + ExceptionUtils.logExceptionIfExcepted(cause.getCause(), log); taskManagerHeartbeatManager.unmonitorTarget(resourceID); slotPoolService.releaseTaskManager(resourceID, cause); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index f4834efbc3d..3cd99ca0cf7 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -1014,7 +1014,9 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> log.info( "Closing TaskExecutor connection {} because: {}", resourceID.getStringWithMetadata(), - cause.getMessage()); + cause.getMessage(), + ExceptionUtils.returnExceptionIfUnexpected(cause.getCause())); + ExceptionUtils.logExceptionIfExcepted(cause.getCause(), log); // TODO :: suggest failed task executor to stop itself slotManager.unregisterTaskManager(workerRegistration.getInstanceID(), cause); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java index 8872d8c3bab..f3d0b3fcb79 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java @@ -37,7 +37,7 @@ import org.apache.flink.runtime.slots.ResourceRequirement; import org.apache.flink.runtime.slots.ResourceRequirements; import org.apache.flink.runtime.taskexecutor.SlotReport; import org.apache.flink.runtime.util.ResourceCounter; -import org.apache.flink.util.FlinkException; +import org.apache.flink.util.FlinkExpectedException; import org.apache.flink.util.Preconditions; import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.concurrent.ScheduledExecutor; @@ -332,7 +332,8 @@ public class FineGrainedSlotManager implements SlotManager { maxTotalMem.toHumanReadableString()); resourceActions.releaseResource( taskExecutorConnection.getInstanceID(), - new FlinkException("The max total resource limitation is reached.")); + new FlinkExpectedException( + "The max total resource limitation is reached.")); return false; } @@ -707,7 +708,8 @@ public class FineGrainedSlotManager implements SlotManager { } private void releaseIdleTaskExecutor(InstanceID timedOutTaskManagerId) { - final FlinkException cause = new FlinkException("TaskManager exceeded the idle timeout."); + final FlinkExpectedException cause = + new FlinkExpectedException("TaskManager exceeded the idle timeout."); resourceActions.releaseResource(timedOutTaskManagerId, cause); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java index ebe61ace658..1df5a872ee4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java @@ -27,7 +27,7 @@ import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnect import org.apache.flink.runtime.slots.ResourceRequirement; import org.apache.flink.runtime.taskexecutor.SlotReport; import org.apache.flink.runtime.taskexecutor.SlotStatus; -import org.apache.flink.util.FlinkException; +import org.apache.flink.util.FlinkExpectedException; import org.apache.flink.util.MathUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.concurrent.ScheduledExecutor; @@ -159,7 +159,8 @@ class TaskExecutorManager implements AutoCloseable { maxSlotNum); resourceActions.releaseResource( taskExecutorConnection.getInstanceID(), - new FlinkException("The total number of slots exceeds the max limitation.")); + new FlinkExpectedException( + "The total number of slots exceeds the max limitation.")); return false; } @@ -400,7 +401,8 @@ class TaskExecutorManager implements AutoCloseable { } private void releaseIdleTaskExecutor(InstanceID timedOutTaskManagerId) { - final FlinkException cause = new FlinkException("TaskExecutor exceeded the idle timeout."); + final FlinkExpectedException cause = + new FlinkExpectedException("TaskExecutor exceeded the idle timeout."); LOG.debug( "Release TaskExecutor {} because it exceeded the idle timeout.", timedOutTaskManagerId); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index b7e6ee28578..10b5b6eae61 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -132,6 +132,7 @@ import org.apache.flink.runtime.webmonitor.threadinfo.ThreadInfoSamplesRequest; import org.apache.flink.types.SerializableOptional; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; +import org.apache.flink.util.FlinkExpectedException; import org.apache.flink.util.OptionalConsumer; import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedValue; @@ -453,7 +454,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { Throwable jobManagerDisconnectThrowable = null; - FlinkException cause = new FlinkException("The TaskExecutor is shutting down."); + FlinkExpectedException cause = + new FlinkExpectedException("The TaskExecutor is shutting down."); closeResourceManagerConnection(cause); @@ -1414,11 +1416,11 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { final ResourceID resourceManagerResourceId = establishedResourceManagerConnection.getResourceManagerResourceId(); - if (log.isDebugEnabled()) { - log.debug("Close ResourceManager connection {}.", resourceManagerResourceId, cause); - } else { - log.info("Close ResourceManager connection {}.", resourceManagerResourceId); - } + log.info( + "Close ResourceManager connection {}.", + resourceManagerResourceId, + ExceptionUtils.returnExceptionIfUnexpected(cause.getCause())); + ExceptionUtils.logExceptionIfExcepted(cause.getCause(), log); resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerResourceId); ResourceManagerGateway resourceManagerGateway = @@ -1665,11 +1667,11 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { private void disconnectJobManagerConnection( JobTable.Connection jobManagerConnection, Exception cause) { final JobID jobId = jobManagerConnection.getJobId(); - if (log.isDebugEnabled()) { - log.debug("Close JobManager connection for job {}.", jobId, cause); - } else { - log.info("Close JobManager connection for job {}.", jobId); - } + log.info( + "Close JobManager connection for job {}.", + jobId, + ExceptionUtils.returnExceptionIfUnexpected(cause.getCause())); + ExceptionUtils.logExceptionIfExcepted(cause.getCause(), log); // 1. fail tasks running under this JobID Iterator<Task> tasks = taskSlotTable.getTasks(jobId); @@ -1970,8 +1972,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { && !partitionTracker.isTrackingPartitionsFor(jobId)) { // we can remove the job from the job leader service - final FlinkException cause = - new FlinkException( + final FlinkExpectedException cause = + new FlinkExpectedException( "TaskExecutor " + getAddress() + " has no more allocated slots for job "