This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f3758f3d2f898706bd3cb40699e3df75e34f112d
Author: Till Rohrmann <trohrm...@apache.org>
AuthorDate: Tue Feb 19 11:53:00 2019 +0100

    [hotfix] Remove unused methods from TaskManagerGateway
---
 .../jobmanager/slots/ActorTaskManagerGateway.java  | 37 ------------------
 .../jobmanager/slots/TaskManagerGateway.java       | 44 ----------------------
 .../runtime/jobmaster/RpcTaskManagerGateway.java   | 33 ----------------
 .../utils/SimpleAckingTaskManagerGateway.java      | 37 ------------------
 4 files changed, 151 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
index 6b88752..54c11ff 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
@@ -22,18 +22,13 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.blob.TransientBlobKey;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.clusterframework.messages.StopCluster;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.PartitionInfo;
 import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.messages.Messages;
-import org.apache.flink.runtime.messages.StackTrace;
 import org.apache.flink.runtime.messages.StackTraceSampleMessages;
 import org.apache.flink.runtime.messages.StackTraceSampleResponse;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
@@ -71,28 +66,6 @@ public class ActorTaskManagerGateway implements 
TaskManagerGateway {
        }
 
        @Override
-       public void disconnectFromJobManager(InstanceID instanceId, Exception 
cause) {
-               actorGateway.tell(new Messages.Disconnect(instanceId, cause));
-       }
-
-       @Override
-       public void stopCluster(final ApplicationStatus applicationStatus, 
final String message) {
-               actorGateway.tell(new StopCluster(applicationStatus, message));
-       }
-
-       @Override
-       public CompletableFuture<StackTrace> requestStackTrace(final Time 
timeout) {
-               Preconditions.checkNotNull(timeout);
-
-               scala.concurrent.Future<StackTrace> stackTraceFuture = 
actorGateway.ask(
-                       TaskManagerMessages.SendStackTrace$.MODULE$.get(),
-                       new FiniteDuration(timeout.getSize(), 
timeout.getUnit()))
-                       
.mapTo(ClassTag$.MODULE$.<StackTrace>apply(StackTrace.class));
-
-               return FutureUtils.toJava(stackTraceFuture);
-       }
-
-       @Override
        public CompletableFuture<StackTraceSampleResponse> 
requestStackTraceSample(
                        ExecutionAttemptID executionAttemptID,
                        int sampleId,
@@ -210,16 +183,6 @@ public class ActorTaskManagerGateway implements 
TaskManagerGateway {
        }
 
        @Override
-       public CompletableFuture<TransientBlobKey> requestTaskManagerLog(Time 
timeout) {
-               return 
requestTaskManagerLog((TaskManagerMessages.RequestTaskManagerLog) 
TaskManagerMessages.getRequestTaskManagerLog(), timeout);
-       }
-
-       @Override
-       public CompletableFuture<TransientBlobKey> 
requestTaskManagerStdout(Time timeout) {
-               return 
requestTaskManagerLog((TaskManagerMessages.RequestTaskManagerLog) 
TaskManagerMessages.getRequestTaskManagerStdout(), timeout);
-       }
-
-       @Override
        public CompletableFuture<Acknowledge> freeSlot(AllocationID 
allocationId, Throwable cause, Time timeout) {
                throw new UnsupportedOperationException("The old TaskManager 
does not support freeing slots");
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
index b2aca32..1a8eda5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
@@ -20,16 +20,12 @@ package org.apache.flink.runtime.jobmanager.slots;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.blob.TransientBlobKey;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.PartitionInfo;
-import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.messages.StackTrace;
 import org.apache.flink.runtime.messages.StackTraceSampleResponse;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 
@@ -48,30 +44,6 @@ public interface TaskManagerGateway {
        String getAddress();
 
        /**
-        * Disconnect the task manager from the job manager.
-        *
-        * @param instanceId identifying the task manager
-        * @param cause of the disconnection
-        */
-       void disconnectFromJobManager(InstanceID instanceId, Exception cause);
-
-       /**
-        * Stop the cluster.
-        *
-        * @param applicationStatus to stop the cluster with
-        * @param message to deliver
-        */
-       void stopCluster(final ApplicationStatus applicationStatus, final 
String message);
-
-       /**
-        * Request the stack trace from the task manager.
-        *
-        * @param timeout for the stack trace request
-        * @return Future for a stack trace
-        */
-       CompletableFuture<StackTrace> requestStackTrace(final Time timeout);
-
-       /**
         * Request a stack trace sample from the given task.
         *
         * @param executionAttemptID identifying the task to sample
@@ -174,22 +146,6 @@ public interface TaskManagerGateway {
                CheckpointOptions checkpointOptions);
 
        /**
-        * Request the task manager log from the task manager.
-        *
-        * @param timeout for the request
-        * @return Future blob key under which the task manager log has been 
stored
-        */
-       CompletableFuture<TransientBlobKey> requestTaskManagerLog(final Time 
timeout);
-
-       /**
-        * Request the task manager stdout from the task manager.
-        *
-        * @param timeout for the request
-        * @return Future blob key under which the task manager stdout file has 
been stored
-        */
-       CompletableFuture<TransientBlobKey> requestTaskManagerStdout(final Time 
timeout);
-
-       /**
         * Frees the slot with the given allocation ID.
         *
         * @param allocationId identifying the slot to free
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
index 0648be0..de19cb5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
@@ -20,19 +20,14 @@ package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.blob.TransientBlobKey;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.PartitionInfo;
-import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.messages.StackTrace;
 import org.apache.flink.runtime.messages.StackTraceSampleResponse;
-import org.apache.flink.runtime.taskexecutor.FileType;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.util.Preconditions;
 
@@ -58,24 +53,6 @@ public class RpcTaskManagerGateway implements 
TaskManagerGateway {
        }
 
        @Override
-       public void disconnectFromJobManager(InstanceID instanceId, Exception 
cause) {
-//             taskExecutorGateway.disconnectFromJobManager(instanceId, cause);
-               throw new UnsupportedOperationException("Operation is not yet 
supported.");
-       }
-
-       @Override
-       public void stopCluster(ApplicationStatus applicationStatus, String 
message) {
-//             taskExecutorGateway.stopCluster(applicationStatus, message);
-               throw new UnsupportedOperationException("Operation is not yet 
supported.");
-       }
-
-       @Override
-       public CompletableFuture<StackTrace> requestStackTrace(Time timeout) {
-//             return taskExecutorGateway.requestStackTrace(timeout);
-               throw new UnsupportedOperationException("Operation is not yet 
supported.");
-       }
-
-       @Override
        public CompletableFuture<StackTraceSampleResponse> 
requestStackTraceSample(
                        ExecutionAttemptID executionAttemptID,
                        int sampleId,
@@ -133,16 +110,6 @@ public class RpcTaskManagerGateway implements 
TaskManagerGateway {
        }
 
        @Override
-       public CompletableFuture<TransientBlobKey> requestTaskManagerLog(Time 
timeout) {
-               return taskExecutorGateway.requestFileUpload(FileType.LOG, 
timeout);
-       }
-
-       @Override
-       public CompletableFuture<TransientBlobKey> 
requestTaskManagerStdout(Time timeout) {
-               return taskExecutorGateway.requestFileUpload(FileType.STDOUT, 
timeout);
-       }
-
-       @Override
        public CompletableFuture<Acknowledge> freeSlot(AllocationID 
allocationId, Throwable cause, Time timeout) {
                return taskExecutorGateway.freeSlot(
                        allocationId,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
index e53b480..0fd8820 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
@@ -20,26 +20,19 @@ package org.apache.flink.runtime.executiongraph.utils;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.blob.TransientBlobKey;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.PartitionInfo;
-import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.messages.StackTrace;
 import org.apache.flink.runtime.messages.StackTraceSampleResponse;
 
-import javax.annotation.Nonnull;
-
 import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
 
@@ -57,9 +50,6 @@ public class SimpleAckingTaskManagerGateway implements 
TaskManagerGateway {
 
        private volatile BiFunction<AllocationID, Throwable, 
CompletableFuture<Acknowledge>> freeSlotFunction;
 
-       @Nonnull
-       private volatile BiConsumer<InstanceID, Exception> 
disconnectFromJobManagerConsumer = (ignoredA, ignoredB) -> {};
-
        public SimpleAckingTaskManagerGateway() {
                optSubmitConsumer = Optional.empty();
                optCancelConsumer = Optional.empty();
@@ -77,29 +67,12 @@ public class SimpleAckingTaskManagerGateway implements 
TaskManagerGateway {
                this.freeSlotFunction = freeSlotFunction;
        }
 
-       public void setDisconnectFromJobManagerConsumer(@Nonnull 
BiConsumer<InstanceID, Exception> disconnectFromJobManagerConsumer) {
-               this.disconnectFromJobManagerConsumer = 
disconnectFromJobManagerConsumer;
-       }
-
        @Override
        public String getAddress() {
                return address;
        }
 
        @Override
-       public void disconnectFromJobManager(InstanceID instanceId, Exception 
cause) {
-               disconnectFromJobManagerConsumer.accept(instanceId, cause);
-       }
-
-       @Override
-       public void stopCluster(ApplicationStatus applicationStatus, String 
message) {}
-
-       @Override
-       public CompletableFuture<StackTrace> requestStackTrace(Time timeout) {
-               return FutureUtils.completedExceptionally(new 
UnsupportedOperationException());
-       }
-
-       @Override
        public CompletableFuture<StackTraceSampleResponse> 
requestStackTraceSample(
                        ExecutionAttemptID executionAttemptID,
                        int sampleId,
@@ -151,16 +124,6 @@ public class SimpleAckingTaskManagerGateway implements 
TaskManagerGateway {
                        CheckpointOptions checkpointOptions) {}
 
        @Override
-       public CompletableFuture<TransientBlobKey> requestTaskManagerLog(Time 
timeout) {
-               return FutureUtils.completedExceptionally(new 
UnsupportedOperationException());
-       }
-
-       @Override
-       public CompletableFuture<TransientBlobKey> 
requestTaskManagerStdout(Time timeout) {
-               return FutureUtils.completedExceptionally(new 
UnsupportedOperationException());
-       }
-
-       @Override
        public CompletableFuture<Acknowledge> freeSlot(AllocationID 
allocationId, Throwable cause, Time timeout) {
                final BiFunction<AllocationID, Throwable, 
CompletableFuture<Acknowledge>> currentFreeSlotFunction = freeSlotFunction;
 

Reply via email to