This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit f3758f3d2f898706bd3cb40699e3df75e34f112d Author: Till Rohrmann <trohrm...@apache.org> AuthorDate: Tue Feb 19 11:53:00 2019 +0100 [hotfix] Remove unused methods from TaskManagerGateway --- .../jobmanager/slots/ActorTaskManagerGateway.java | 37 ------------------ .../jobmanager/slots/TaskManagerGateway.java | 44 ---------------------- .../runtime/jobmaster/RpcTaskManagerGateway.java | 33 ---------------- .../utils/SimpleAckingTaskManagerGateway.java | 37 ------------------ 4 files changed, 151 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java index 6b88752..54c11ff 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java @@ -22,18 +22,13 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.blob.TransientBlobKey; import org.apache.flink.runtime.checkpoint.CheckpointOptions; -import org.apache.flink.runtime.clusterframework.ApplicationStatus; -import org.apache.flink.runtime.clusterframework.messages.StopCluster; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.PartitionInfo; import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.messages.Messages; -import org.apache.flink.runtime.messages.StackTrace; import org.apache.flink.runtime.messages.StackTraceSampleMessages; import org.apache.flink.runtime.messages.StackTraceSampleResponse; import org.apache.flink.runtime.messages.TaskManagerMessages; @@ -71,28 +66,6 @@ public class ActorTaskManagerGateway implements TaskManagerGateway { } @Override - public void disconnectFromJobManager(InstanceID instanceId, Exception cause) { - actorGateway.tell(new Messages.Disconnect(instanceId, cause)); - } - - @Override - public void stopCluster(final ApplicationStatus applicationStatus, final String message) { - actorGateway.tell(new StopCluster(applicationStatus, message)); - } - - @Override - public CompletableFuture<StackTrace> requestStackTrace(final Time timeout) { - Preconditions.checkNotNull(timeout); - - scala.concurrent.Future<StackTrace> stackTraceFuture = actorGateway.ask( - TaskManagerMessages.SendStackTrace$.MODULE$.get(), - new FiniteDuration(timeout.getSize(), timeout.getUnit())) - .mapTo(ClassTag$.MODULE$.<StackTrace>apply(StackTrace.class)); - - return FutureUtils.toJava(stackTraceFuture); - } - - @Override public CompletableFuture<StackTraceSampleResponse> requestStackTraceSample( ExecutionAttemptID executionAttemptID, int sampleId, @@ -210,16 +183,6 @@ public class ActorTaskManagerGateway implements TaskManagerGateway { } @Override - public CompletableFuture<TransientBlobKey> requestTaskManagerLog(Time timeout) { - return requestTaskManagerLog((TaskManagerMessages.RequestTaskManagerLog) TaskManagerMessages.getRequestTaskManagerLog(), timeout); - } - - @Override - public CompletableFuture<TransientBlobKey> requestTaskManagerStdout(Time timeout) { - return requestTaskManagerLog((TaskManagerMessages.RequestTaskManagerLog) TaskManagerMessages.getRequestTaskManagerStdout(), timeout); - } - - @Override public CompletableFuture<Acknowledge> freeSlot(AllocationID allocationId, Throwable cause, Time timeout) { throw new UnsupportedOperationException("The old TaskManager does not support freeing slots"); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java index b2aca32..1a8eda5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java @@ -20,16 +20,12 @@ package org.apache.flink.runtime.jobmanager.slots; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.blob.TransientBlobKey; import org.apache.flink.runtime.checkpoint.CheckpointOptions; -import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.PartitionInfo; -import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.messages.StackTrace; import org.apache.flink.runtime.messages.StackTraceSampleResponse; import org.apache.flink.runtime.rpc.RpcTimeout; @@ -48,30 +44,6 @@ public interface TaskManagerGateway { String getAddress(); /** - * Disconnect the task manager from the job manager. - * - * @param instanceId identifying the task manager - * @param cause of the disconnection - */ - void disconnectFromJobManager(InstanceID instanceId, Exception cause); - - /** - * Stop the cluster. - * - * @param applicationStatus to stop the cluster with - * @param message to deliver - */ - void stopCluster(final ApplicationStatus applicationStatus, final String message); - - /** - * Request the stack trace from the task manager. - * - * @param timeout for the stack trace request - * @return Future for a stack trace - */ - CompletableFuture<StackTrace> requestStackTrace(final Time timeout); - - /** * Request a stack trace sample from the given task. * * @param executionAttemptID identifying the task to sample @@ -174,22 +146,6 @@ public interface TaskManagerGateway { CheckpointOptions checkpointOptions); /** - * Request the task manager log from the task manager. - * - * @param timeout for the request - * @return Future blob key under which the task manager log has been stored - */ - CompletableFuture<TransientBlobKey> requestTaskManagerLog(final Time timeout); - - /** - * Request the task manager stdout from the task manager. - * - * @param timeout for the request - * @return Future blob key under which the task manager stdout file has been stored - */ - CompletableFuture<TransientBlobKey> requestTaskManagerStdout(final Time timeout); - - /** * Frees the slot with the given allocation ID. * * @param allocationId identifying the slot to free diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java index 0648be0..de19cb5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java @@ -20,19 +20,14 @@ package org.apache.flink.runtime.jobmaster; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.blob.TransientBlobKey; import org.apache.flink.runtime.checkpoint.CheckpointOptions; -import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.PartitionInfo; -import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.messages.StackTrace; import org.apache.flink.runtime.messages.StackTraceSampleResponse; -import org.apache.flink.runtime.taskexecutor.FileType; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.apache.flink.util.Preconditions; @@ -58,24 +53,6 @@ public class RpcTaskManagerGateway implements TaskManagerGateway { } @Override - public void disconnectFromJobManager(InstanceID instanceId, Exception cause) { -// taskExecutorGateway.disconnectFromJobManager(instanceId, cause); - throw new UnsupportedOperationException("Operation is not yet supported."); - } - - @Override - public void stopCluster(ApplicationStatus applicationStatus, String message) { -// taskExecutorGateway.stopCluster(applicationStatus, message); - throw new UnsupportedOperationException("Operation is not yet supported."); - } - - @Override - public CompletableFuture<StackTrace> requestStackTrace(Time timeout) { -// return taskExecutorGateway.requestStackTrace(timeout); - throw new UnsupportedOperationException("Operation is not yet supported."); - } - - @Override public CompletableFuture<StackTraceSampleResponse> requestStackTraceSample( ExecutionAttemptID executionAttemptID, int sampleId, @@ -133,16 +110,6 @@ public class RpcTaskManagerGateway implements TaskManagerGateway { } @Override - public CompletableFuture<TransientBlobKey> requestTaskManagerLog(Time timeout) { - return taskExecutorGateway.requestFileUpload(FileType.LOG, timeout); - } - - @Override - public CompletableFuture<TransientBlobKey> requestTaskManagerStdout(Time timeout) { - return taskExecutorGateway.requestFileUpload(FileType.STDOUT, timeout); - } - - @Override public CompletableFuture<Acknowledge> freeSlot(AllocationID allocationId, Throwable cause, Time timeout) { return taskExecutorGateway.freeSlot( allocationId, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java index e53b480..0fd8820 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java @@ -20,26 +20,19 @@ package org.apache.flink.runtime.executiongraph.utils; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.blob.TransientBlobKey; import org.apache.flink.runtime.checkpoint.CheckpointOptions; -import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.PartitionInfo; -import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.messages.StackTrace; import org.apache.flink.runtime.messages.StackTraceSampleResponse; -import javax.annotation.Nonnull; - import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -57,9 +50,6 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway { private volatile BiFunction<AllocationID, Throwable, CompletableFuture<Acknowledge>> freeSlotFunction; - @Nonnull - private volatile BiConsumer<InstanceID, Exception> disconnectFromJobManagerConsumer = (ignoredA, ignoredB) -> {}; - public SimpleAckingTaskManagerGateway() { optSubmitConsumer = Optional.empty(); optCancelConsumer = Optional.empty(); @@ -77,29 +67,12 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway { this.freeSlotFunction = freeSlotFunction; } - public void setDisconnectFromJobManagerConsumer(@Nonnull BiConsumer<InstanceID, Exception> disconnectFromJobManagerConsumer) { - this.disconnectFromJobManagerConsumer = disconnectFromJobManagerConsumer; - } - @Override public String getAddress() { return address; } @Override - public void disconnectFromJobManager(InstanceID instanceId, Exception cause) { - disconnectFromJobManagerConsumer.accept(instanceId, cause); - } - - @Override - public void stopCluster(ApplicationStatus applicationStatus, String message) {} - - @Override - public CompletableFuture<StackTrace> requestStackTrace(Time timeout) { - return FutureUtils.completedExceptionally(new UnsupportedOperationException()); - } - - @Override public CompletableFuture<StackTraceSampleResponse> requestStackTraceSample( ExecutionAttemptID executionAttemptID, int sampleId, @@ -151,16 +124,6 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway { CheckpointOptions checkpointOptions) {} @Override - public CompletableFuture<TransientBlobKey> requestTaskManagerLog(Time timeout) { - return FutureUtils.completedExceptionally(new UnsupportedOperationException()); - } - - @Override - public CompletableFuture<TransientBlobKey> requestTaskManagerStdout(Time timeout) { - return FutureUtils.completedExceptionally(new UnsupportedOperationException()); - } - - @Override public CompletableFuture<Acknowledge> freeSlot(AllocationID allocationId, Throwable cause, Time timeout) { final BiFunction<AllocationID, Throwable, CompletableFuture<Acknowledge>> currentFreeSlotFunction = freeSlotFunction;