This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 21cff1d00d4866614c005e005000b4e8ad783142
Author: Gary Yao <g...@apache.org>
AuthorDate: Wed Mar 13 18:50:06 2019 +0100

    [hotfix][runtime] Delete unused interface ExecutionStatusListener
    
    This closes #7977.
---
 .../runtime/executiongraph/ExecutionGraph.java     | 27 ---------
 .../executiongraph/ExecutionStatusListener.java    | 54 -----------------
 .../executiongraph/StatusListenerMessenger.java    | 70 ----------------------
 .../runtime/messages/ExecutionGraphMessages.scala  | 20 -------
 4 files changed, 171 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 93e54e9..5e44e94 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -212,9 +212,6 @@ public class ExecutionGraph implements AccessExecutionGraph 
{
         * (such as from RUNNING to FINISHED). */
        private final List<JobStatusListener> jobStatusListeners;
 
-       /** Listeners that receive messages whenever a single task execution 
changes its status. */
-       private final List<ExecutionStatusListener> executionListeners;
-
        /** The implementation that decides how to recover the failures of 
tasks. */
        private final FailoverStrategy failoverStrategy;
 
@@ -409,7 +406,6 @@ public class ExecutionGraph implements AccessExecutionGraph 
{
                this.currentExecutions = new ConcurrentHashMap<>(16);
 
                this.jobStatusListeners  = new CopyOnWriteArrayList<>();
-               this.executionListeners = new CopyOnWriteArrayList<>();
 
                this.stateTimestamps = new long[JobStatus.values().length];
                this.stateTimestamps[JobStatus.CREATED.ordinal()] = 
System.currentTimeMillis();
@@ -1758,12 +1754,6 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
                }
        }
 
-       public void registerExecutionListener(ExecutionStatusListener listener) 
{
-               if (listener != null) {
-                       executionListeners.add(listener);
-               }
-       }
-
        private void notifyJobStatusChange(JobStatus newState, Throwable error) 
{
                if (jobStatusListeners.size() > 0) {
                        final long timestamp = System.currentTimeMillis();
@@ -1784,23 +1774,6 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
                        final ExecutionState newExecutionState,
                        final Throwable error) {
 
-               if (executionListeners.size() > 0) {
-                       final ExecutionJobVertex vertex = 
execution.getVertex().getJobVertex();
-                       final String message = error == null ? null : 
ExceptionUtils.stringifyException(error);
-                       final long timestamp = System.currentTimeMillis();
-
-                       for (ExecutionStatusListener listener : 
executionListeners) {
-                               try {
-                                       listener.executionStatusChanged(
-                                                       getJobID(), 
vertex.getJobVertexId(), vertex.getJobVertex().getName(),
-                                                       
vertex.getParallelism(), execution.getParallelSubtaskIndex(),
-                                                       
execution.getAttemptId(), newExecutionState, timestamp, message);
-                               } catch (Throwable t) {
-                                       LOG.warn("Error while notifying 
ExecutionStatusListener", t);
-                               }
-                       }
-               }
-
                // see what this means for us. currently, the first FAILED 
state means -> FAILED
                if (newExecutionState == ExecutionState.FAILED) {
                        final Throwable ex = error != null ? error : new 
FlinkException("Unknown Error (missing cause)");
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionStatusListener.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionStatusListener.java
deleted file mode 100644
index 6fb5a1a..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionStatusListener.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-
-/**
- * Interface for observers that monitor the status of individual task 
executions.
- */
-public interface ExecutionStatusListener {
-
-       /**
-        * Called whenever the execution status of a task changes.
-        * 
-        * @param jobID                  The ID of the job
-        * @param vertexID               The ID of the task vertex
-        * @param taskName               The name of the task
-        * @param totalNumberOfSubTasks  The parallelism of the task
-        * @param subtaskIndex           The subtask's parallel index
-        * @param executionID            The ID of the execution attempt
-        * @param newExecutionState      The status to which the task switched
-        * @param timestamp              The timestamp when the change 
occurred. Informational only.
-        * @param optionalMessage        An optional message attached to the 
status change, like an
-        *                               exception message.
-        */
-       void executionStatusChanged(
-                       JobID jobID,
-                       JobVertexID vertexID,
-                       String taskName,
-                       int totalNumberOfSubTasks,
-                       int subtaskIndex,
-                       ExecutionAttemptID executionID,
-                       ExecutionState newExecutionState,
-                       long timestamp,
-                       String optionalMessage);
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/StatusListenerMessenger.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/StatusListenerMessenger.java
deleted file mode 100644
index ea69c44..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/StatusListenerMessenger.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import akka.actor.ActorRef;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.messages.ExecutionGraphMessages;
-import org.apache.flink.util.SerializedThrowable;
-
-import java.util.UUID;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A {@code JobStatusListener} and {@code ExecutionStatusListener} that sends 
an actor message
- * for each status change.
- */
-public class StatusListenerMessenger implements JobStatusListener, 
ExecutionStatusListener {
-
-       private final AkkaActorGateway target;
-
-       public StatusListenerMessenger(ActorRef target, UUID leaderSessionId) {
-               this.target = new AkkaActorGateway(checkNotNull(target), 
leaderSessionId);
-       }
-
-       @Override
-       public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long 
timestamp, Throwable error) {
-               ExecutionGraphMessages.JobStatusChanged message =
-                               new 
ExecutionGraphMessages.JobStatusChanged(jobId, newJobStatus, timestamp,
-                                               error == null ? null : new 
SerializedThrowable(error));
-
-               target.tell(message);
-       }
-
-       @Override
-       public void executionStatusChanged(
-                       JobID jobID, JobVertexID vertexID,
-                       String taskName, int taskParallelism, int subtaskIndex,
-                       ExecutionAttemptID executionID, ExecutionState 
newExecutionState,
-                       long timestamp, String optionalMessage) {
-               
-               ExecutionGraphMessages.ExecutionStateChanged message = 
-                               new 
ExecutionGraphMessages.ExecutionStateChanged(
-                                       jobID, vertexID, taskName, 
taskParallelism, subtaskIndex,
-                                       executionID, newExecutionState, 
timestamp, optionalMessage);
-
-               target.tell(message);
-       }
-}
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
index 2369d3c..59efa73 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
@@ -73,26 +73,6 @@ object ExecutionGraphMessages {
     }
   }
 
-  /**
-   * Denotes the job state change of a job.
-   *
-   * @param jobID identifying the corresponding job
-   * @param newJobStatus
-   * @param timestamp
-   * @param error
-   */
-  case class JobStatusChanged(
-      jobID: JobID,
-      newJobStatus: JobStatus,
-      timestamp: Long,
-      error: Throwable)
-    extends RequiresLeaderSessionID {
-    
-    override def toString: String = {
-      s"${timestampToString(timestamp)}\tJob execution switched to status 
$newJobStatus."
-    }
-  }
-
   // --------------------------------------------------------------------------
   //  Utilities
   // --------------------------------------------------------------------------

Reply via email to