http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java index 9113fcd..9a0479f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java @@ -55,7 +55,11 @@ public final class BlobClient implements Closeable { public BlobClient(final InetSocketAddress serverAddress) throws IOException { this.socket = new Socket(); - this.socket.connect(serverAddress); + try { + this.socket.connect(serverAddress); + }catch(IOException e){ + throw new IOException("Could not connect to BlobServer at address " + serverAddress, e); + } } /**
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java index de22e0f..0b02acb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java @@ -101,15 +101,21 @@ public final class BlobServer extends Thread implements BlobService{ */ public BlobServer() throws IOException { - this.serverSocket = new ServerSocket(0); - start(); + try { + this.serverSocket = new ServerSocket(0); + + start(); + + if (LOG.isInfoEnabled()) { + LOG.info(String.format("Started BLOB server on port %d", + this.serverSocket.getLocalPort())); + } - if (LOG.isInfoEnabled()) { - LOG.info(String.format("Started BLOB server on port %d", - this.serverSocket.getLocalPort())); + this.storageDir = BlobUtils.initStorageDirectory(); + }catch(IOException e){ + throw new IOException("Could not create BlobServer with random port.", e); } - this.storageDir = BlobUtils.initStorageDirectory(); } /** http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionListener.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionListener.java deleted file mode 100644 index bd2e118..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionListener.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.execution; - -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.jobgraph.JobID; -import org.apache.flink.runtime.jobgraph.JobVertexID; - -/** - * Implementing this interface allows classes to receive notifications about - * changes of a task's execution state. - */ -public interface ExecutionListener { - - void executionStateChanged(JobID jobID, JobVertexID vertexId, int subtask, ExecutionAttemptID executionId, - ExecutionState newExecutionState, String optionalMessage); -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 915140c..8faa235 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -38,7 +38,6 @@ import org.slf4j.LoggerFactory; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.blob.BlobKey; -import org.apache.flink.runtime.execution.ExecutionListener; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.InstanceConnectionInfo; @@ -97,10 +96,6 @@ public class ExecutionGraph { private final List<BlobKey> requiredJarFiles; - private final List<JobStatusListener> jobStatusListeners; - - private final List<ExecutionListener> executionListeners; - private final List<ActorRef> jobStatusListenerActors; private final List<ActorRef> executionListenerActors; @@ -150,8 +145,6 @@ public class ExecutionGraph { this.verticesInCreationOrder = new ArrayList<ExecutionJobVertex>(); this.currentExecutions = new ConcurrentHashMap<ExecutionAttemptID, Execution>(); - this.jobStatusListeners = new CopyOnWriteArrayList<JobStatusListener>(); - this.executionListeners = new CopyOnWriteArrayList<ExecutionListener>(); this.jobStatusListenerActors = new CopyOnWriteArrayList<ActorRef>(); this.executionListenerActors = new CopyOnWriteArrayList<ActorRef>(); @@ -638,14 +631,6 @@ public class ExecutionGraph { // Listeners & Observers // -------------------------------------------------------------------------------------------- - public void registerJobStatusListener(JobStatusListener jobStatusListener) { - this.jobStatusListeners.add(jobStatusListener); - } - - public void registerExecutionListener(ExecutionListener executionListener) { - this.executionListeners.add(executionListener); - } - public void registerJobStatusListener(ActorRef listener){ this.jobStatusListenerActors.add(listener); @@ -662,20 +647,6 @@ public class ExecutionGraph { * @param error */ private void notifyJobStatusChange(JobStatus newState, Throwable error) { - if (jobStatusListeners.size() > 0) { - - String message = error == null ? null : ExceptionUtils.stringifyException(error); - - for (JobStatusListener listener : this.jobStatusListeners) { - try { - listener.jobStatusHasChanged(this, newState, message); - } - catch (Throwable t) { - LOG.error("Notification of job status change caused an error.", t); - } - } - } - if(jobStatusListenerActors.size() > 0){ String message = error == null ? null : ExceptionUtils.stringifyException(error); for(ActorRef listener: jobStatusListenerActors){ @@ -696,17 +667,6 @@ public class ExecutionGraph { */ void notifyExecutionChange(JobVertexID vertexId, int subtask, ExecutionAttemptID executionID, ExecutionState newExecutionState, Throwable error) { - if(executionListeners.size() >0){ - String message = error == null ? null : ExceptionUtils.stringifyException(error); - for (ExecutionListener listener : this.executionListeners) { - try { - listener.executionStateChanged(jobID, vertexId, subtask,executionID, newExecutionState, message); - }catch(Throwable t){ - LOG.error("Notification of execution state change caused an error."); - } - } - } - ExecutionJobVertex vertex = getJobVertex(vertexId); if(executionListenerActors.size() >0){ http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusListener.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusListener.java deleted file mode 100644 index b06d2b2..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusListener.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.executiongraph; - -import org.apache.flink.runtime.jobgraph.JobStatus; - -/** - * This interface allows objects to receive notifications when the status of an observed job has changed. - */ -public interface JobStatusListener { - - /** - * Called when the status of the job changed. - * - * @param executionGraph The executionGraph representing the job. - * @param newJobStatus The new job status. - * @param optionalMessage An optional message (possibly <code>null</code>) that can be attached to the state change. - */ - void jobStatusHasChanged(ExecutionGraph executionGraph, JobStatus newJobStatus, String optionalMessage); -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/TaskManagerProfiler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/TaskManagerProfiler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/TaskManagerProfiler.java index a7f1bd2..709a05c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/TaskManagerProfiler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/TaskManagerProfiler.java @@ -29,23 +29,24 @@ import org.apache.flink.runtime.taskmanager.Task; public interface TaskManagerProfiler { /** - * Registers an {@link org.apache.flink.runtime.execution.ExecutionListener} object for profiling. + * Registers a {@link org.apache.flink.runtime.taskmanager.Task} object for profiling. * * @param task * task to be register a profiling listener for * @param jobConfiguration * the job configuration sent with the task */ - void registerExecutionListener(Task task, Configuration jobConfiguration); + void registerTask(Task task, Configuration jobConfiguration); /** - * Unregisters all previously register {@link org.apache.flink.runtime.execution.ExecutionListener} objects for - * the vertex identified by the given ID. + * Unregisters all previously registered {@link org.apache.flink.runtime.taskmanager.Task} + * objects for the vertex identified by the given ID. * * @param id - * the ID of the vertex to unregister the {@link org.apache.flink.runtime.execution.ExecutionListener} objects for + * the ID of the vertex to unregister the + * {@link org.apache.flink.runtime.taskmanager.Task} objects for */ - void unregisterExecutionListener(ExecutionAttemptID id); + void unregisterTask(ExecutionAttemptID id); /** * Shuts done the task manager's profiling component http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 1903a3c..8570992 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -27,7 +27,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.execution.ExecutionListener; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.RuntimeEnvironment; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; @@ -62,9 +61,6 @@ public final class Task { private final String taskName; private final TaskManager taskManager; - - - private final List<ExecutionListener> executionListeners = new CopyOnWriteArrayList<ExecutionListener>(); private final List<ActorRef> executionListenerActors = new CopyOnWriteArrayList<ActorRef>(); @@ -354,7 +350,7 @@ public final class Task { * the configuration attached to the job */ public void registerProfiler(TaskManagerProfiler taskManagerProfiler, Configuration jobConfiguration) { - taskManagerProfiler.registerExecutionListener(this, jobConfiguration); + taskManagerProfiler.registerTask(this, jobConfiguration); } /** @@ -365,7 +361,7 @@ public final class Task { */ public void unregisterProfiler(TaskManagerProfiler taskManagerProfiler) { if (taskManagerProfiler != null) { - taskManagerProfiler.unregisterExecutionListener(this.executionId); + taskManagerProfiler.unregisterTask(this.executionId); } } @@ -373,24 +369,10 @@ public final class Task { // State Listeners // -------------------------------------------------------------------------------------------- - public void registerExecutionListener(ExecutionListener listener) { - if (listener == null) { - throw new IllegalArgumentException(); - } - this.executionListeners.add(listener); - } - public void registerExecutionListener(ActorRef listener){ executionListenerActors.add(listener); } - public void unregisterExecutionListener(ExecutionListener listener) { - if (listener == null) { - throw new IllegalArgumentException(); - } - this.executionListeners.remove(listener); - } - public void unregisterExecutionListener(ActorRef listener){ executionListenerActors.remove(listener); } @@ -400,15 +382,6 @@ public final class Task { LOG.info(getTaskNameWithSubtasks() + " switched to " + newState + (message == null ? "" : " : " + message)); } - for (ExecutionListener listener : this.executionListeners) { - try { - listener.executionStateChanged(jobId, vertexId, subtaskIndex, executionId, newState, message); - } - catch (Throwable t) { - LOG.error("Error while calling execution listener.", t); - } - } - for(ActorRef listener: executionListenerActors){ listener.tell(new ExecutionGraphMessages.ExecutionStateChanged( jobId, vertexId, taskName, numberOfSubtasks, subtaskIndex,
