http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
index 9113fcd..9a0479f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
@@ -55,7 +55,11 @@ public final class BlobClient implements Closeable {
        public BlobClient(final InetSocketAddress serverAddress) throws 
IOException {
 
                this.socket = new Socket();
-               this.socket.connect(serverAddress);
+               try {
+                       this.socket.connect(serverAddress);
+               }catch(IOException e){
+                       throw new IOException("Could not connect to BlobServer 
at address " + serverAddress, e);
+               }
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index de22e0f..0b02acb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -101,15 +101,21 @@ public final class BlobServer extends Thread implements 
BlobService{
         */
        public BlobServer() throws IOException {
 
-               this.serverSocket = new ServerSocket(0);
-               start();
+               try {
+                       this.serverSocket = new ServerSocket(0);
+
+                       start();
+
+                       if (LOG.isInfoEnabled()) {
+                               LOG.info(String.format("Started BLOB server on 
port %d",
+                                               
this.serverSocket.getLocalPort()));
+                       }
 
-               if (LOG.isInfoEnabled()) {
-                       LOG.info(String.format("Started BLOB server on port %d",
-                               this.serverSocket.getLocalPort()));
+                       this.storageDir = BlobUtils.initStorageDirectory();
+               }catch(IOException e){
+                       throw new IOException("Could not create BlobServer with 
random port.", e);
                }
 
-               this.storageDir = BlobUtils.initStorageDirectory();
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionListener.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionListener.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionListener.java
deleted file mode 100644
index bd2e118..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionListener.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.execution;
-
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.jobgraph.JobID;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-
-/**
- * Implementing this interface allows classes to receive notifications about
- * changes of a task's execution state.
- */
-public interface ExecutionListener {
-
-       void executionStateChanged(JobID jobID, JobVertexID vertexId, int 
subtask, ExecutionAttemptID executionId,
-                       ExecutionState newExecutionState, String 
optionalMessage);
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 915140c..8faa235 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -38,7 +38,6 @@ import org.slf4j.LoggerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.BlobKey;
-import org.apache.flink.runtime.execution.ExecutionListener;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
@@ -97,10 +96,6 @@ public class ExecutionGraph {
        
        private final List<BlobKey> requiredJarFiles;
        
-       private final List<JobStatusListener> jobStatusListeners;
-       
-       private final List<ExecutionListener> executionListeners;
-
        private final List<ActorRef> jobStatusListenerActors;
 
        private final List<ActorRef> executionListenerActors;
@@ -150,8 +145,6 @@ public class ExecutionGraph {
                this.verticesInCreationOrder = new 
ArrayList<ExecutionJobVertex>();
                this.currentExecutions = new 
ConcurrentHashMap<ExecutionAttemptID, Execution>();
                
-               this.jobStatusListeners = new 
CopyOnWriteArrayList<JobStatusListener>();
-               this.executionListeners = new 
CopyOnWriteArrayList<ExecutionListener>();
                this.jobStatusListenerActors  = new 
CopyOnWriteArrayList<ActorRef>();
                this.executionListenerActors = new 
CopyOnWriteArrayList<ActorRef>();
                
@@ -638,14 +631,6 @@ public class ExecutionGraph {
        //  Listeners & Observers
        // 
--------------------------------------------------------------------------------------------
        
-       public void registerJobStatusListener(JobStatusListener 
jobStatusListener) {
-               this.jobStatusListeners.add(jobStatusListener);
-       }
-       
-       public void registerExecutionListener(ExecutionListener 
executionListener) {
-               this.executionListeners.add(executionListener);
-       }
-
        public void registerJobStatusListener(ActorRef listener){
                this.jobStatusListenerActors.add(listener);
 
@@ -662,20 +647,6 @@ public class ExecutionGraph {
         * @param error
         */
        private void notifyJobStatusChange(JobStatus newState, Throwable error) 
{
-               if (jobStatusListeners.size() > 0) {
-                       
-                       String message = error == null ? null : 
ExceptionUtils.stringifyException(error);
-               
-                       for (JobStatusListener listener : 
this.jobStatusListeners) {
-                               try {
-                                       listener.jobStatusHasChanged(this, 
newState, message);
-                               }
-                               catch (Throwable t) {
-                                       LOG.error("Notification of job status 
change caused an error.", t);
-                               }
-                       }
-               }
-
                if(jobStatusListenerActors.size() > 0){
                        String message = error == null ? null : 
ExceptionUtils.stringifyException(error);
                        for(ActorRef listener: jobStatusListenerActors){
@@ -696,17 +667,6 @@ public class ExecutionGraph {
         */
        void notifyExecutionChange(JobVertexID vertexId, int subtask, 
ExecutionAttemptID executionID, ExecutionState
                                                        newExecutionState, 
Throwable error) {
-               if(executionListeners.size() >0){
-                       String message = error == null ? null : 
ExceptionUtils.stringifyException(error);
-                       for (ExecutionListener listener : 
this.executionListeners) {
-                               try {
-                                       listener.executionStateChanged(jobID, 
vertexId, subtask,executionID, newExecutionState, message);
-                               }catch(Throwable t){
-                                       LOG.error("Notification of execution 
state change caused an error.");
-                               }
-                       }
-               }
-
                ExecutionJobVertex vertex = getJobVertex(vertexId);
 
                if(executionListenerActors.size() >0){

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusListener.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusListener.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusListener.java
deleted file mode 100644
index b06d2b2..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusListener.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import org.apache.flink.runtime.jobgraph.JobStatus;
-
-/**
- * This interface allows objects to receive notifications when the status of 
an observed job has changed.
- */
-public interface JobStatusListener {
-
-       /**
-        * Called when the status of the job changed.
-        * 
-        * @param executionGraph   The executionGraph representing the job.
-        * @param newJobStatus     The new job status.
-        * @param optionalMessage  An optional message (possibly 
<code>null</code>) that can be attached to the state change.
-        */
-       void jobStatusHasChanged(ExecutionGraph executionGraph, JobStatus 
newJobStatus, String optionalMessage);
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/TaskManagerProfiler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/TaskManagerProfiler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/TaskManagerProfiler.java
index a7f1bd2..709a05c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/TaskManagerProfiler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/TaskManagerProfiler.java
@@ -29,23 +29,24 @@ import org.apache.flink.runtime.taskmanager.Task;
 public interface TaskManagerProfiler {
 
        /**
-        * Registers an {@link 
org.apache.flink.runtime.execution.ExecutionListener} object for profiling.
+        * Registers a {@link org.apache.flink.runtime.taskmanager.Task} object 
for profiling.
         * 
         * @param task
         *        task to be register a profiling listener for
         * @param jobConfiguration
         *        the job configuration sent with the task
         */
-       void registerExecutionListener(Task task, Configuration 
jobConfiguration);
+       void registerTask(Task task, Configuration jobConfiguration);
 
        /**
-        * Unregisters all previously register {@link 
org.apache.flink.runtime.execution.ExecutionListener} objects for
-        * the vertex identified by the given ID.
+        * Unregisters all previously registered {@link 
org.apache.flink.runtime.taskmanager.Task}
+        * objects for the vertex identified by the given ID.
         * 
         * @param id
-        *        the ID of the vertex to unregister the {@link 
org.apache.flink.runtime.execution.ExecutionListener} objects for
+        *        the ID of the vertex to unregister the
+        *        {@link org.apache.flink.runtime.taskmanager.Task} objects for
         */
-       void unregisterExecutionListener(ExecutionAttemptID id);
+       void unregisterTask(ExecutionAttemptID id);
 
        /**
         * Shuts done the task manager's profiling component

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 1903a3c..8570992 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -27,7 +27,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.execution.ExecutionListener;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.RuntimeEnvironment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -62,9 +61,6 @@ public final class Task {
        private final String taskName;
 
        private final TaskManager taskManager;
-       
-       
-       private final List<ExecutionListener> executionListeners = new 
CopyOnWriteArrayList<ExecutionListener>();
 
        private final List<ActorRef> executionListenerActors = new 
CopyOnWriteArrayList<ActorRef>();
 
@@ -354,7 +350,7 @@ public final class Task {
         *        the configuration attached to the job
         */
        public void registerProfiler(TaskManagerProfiler taskManagerProfiler, 
Configuration jobConfiguration) {
-               taskManagerProfiler.registerExecutionListener(this, 
jobConfiguration);
+               taskManagerProfiler.registerTask(this, jobConfiguration);
        }
 
        /**
@@ -365,7 +361,7 @@ public final class Task {
         */
        public void unregisterProfiler(TaskManagerProfiler taskManagerProfiler) 
{
                if (taskManagerProfiler != null) {
-                       
taskManagerProfiler.unregisterExecutionListener(this.executionId);
+                       taskManagerProfiler.unregisterTask(this.executionId);
                }
        }
        
@@ -373,24 +369,10 @@ public final class Task {
        //                                     State Listeners
        // 
--------------------------------------------------------------------------------------------
        
-       public void registerExecutionListener(ExecutionListener listener) {
-               if (listener == null) {
-                       throw new IllegalArgumentException();
-               }
-               this.executionListeners.add(listener);
-       }
-
        public void registerExecutionListener(ActorRef listener){
                executionListenerActors.add(listener);
        }
 
-       public void unregisterExecutionListener(ExecutionListener listener) {
-               if (listener == null) {
-                       throw new IllegalArgumentException();
-               }
-               this.executionListeners.remove(listener);
-       }
-
        public void unregisterExecutionListener(ActorRef listener){
                executionListenerActors.remove(listener);
        }
@@ -400,15 +382,6 @@ public final class Task {
                        LOG.info(getTaskNameWithSubtasks() + " switched to " + 
newState + (message == null ? "" : " : " + message));
                }
                
-               for (ExecutionListener listener : this.executionListeners) {
-                       try {
-                               listener.executionStateChanged(jobId, vertexId, 
subtaskIndex, executionId, newState, message);
-                       }
-                       catch (Throwable t) {
-                               LOG.error("Error while calling execution 
listener.", t);
-                       }
-               }
-
                for(ActorRef listener: executionListenerActors){
                        listener.tell(new 
ExecutionGraphMessages.ExecutionStateChanged(
                                                        jobId, vertexId, 
taskName, numberOfSubtasks, subtaskIndex,

Reply via email to