[FLINK-1969] [runtime] Remove deprecated profiler code

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fbea2da2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fbea2da2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fbea2da2

Branch: refs/heads/master
Commit: fbea2da26d01c470687a5ad217a5fd6ad1de89e4
Parents: 59faf46
Author: Stephan Ewen <se...@apache.org>
Authored: Sun May 3 14:10:35 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon May 11 21:13:44 2015 +0200

----------------------------------------------------------------------
 .../runtime/profiling/JobManagerProfiler.java   |  75 ----
 .../runtime/profiling/ProfilingException.java   |  42 --
 .../runtime/profiling/ProfilingListener.java    |  27 --
 .../flink/runtime/profiling/ProfilingUtils.java | 180 ---------
 .../runtime/profiling/TaskManagerProfiler.java  |  56 ---
 .../profiling/impl/EnvironmentThreadSet.java    | 217 ----------
 .../profiling/impl/InstanceProfiler.java        | 342 ----------------
 .../profiling/impl/JobProfilingData.java        | 158 --------
 .../InternalExecutionVertexProfilingData.java   |  90 -----
 ...ernalExecutionVertexThreadProfilingData.java |  96 -----
 .../types/InternalInstanceProfilingData.java    | 244 ------------
 .../impl/types/InternalProfilingData.java       |  23 --
 .../impl/types/ProfilingDataContainer.java      | 101 -----
 .../profiling/types/InstanceProfilingEvent.java | 399 -------------------
 .../types/InstanceSummaryProfilingEvent.java    |  89 -----
 .../runtime/profiling/types/ProfilingEvent.java | 135 -------
 .../types/SingleInstanceProfilingEvent.java     | 147 -------
 .../profiling/types/ThreadProfilingEvent.java   | 150 -------
 .../profiling/types/VertexProfilingEvent.java   | 142 -------
 .../flink/runtime/jobmanager/JobManager.scala   |  26 +-
 .../runtime/jobmanager/JobManagerProfiler.scala |  55 ---
 .../messages/JobManagerProfilerMessages.scala   |  34 --
 .../messages/TaskManagerProfilerMessages.scala  |  55 ---
 .../taskmanager/TaskManagerProfiler.scala       | 182 ---------
 .../profiling/impl/InstanceProfilerTest.java    | 182 ---------
 .../profiling/types/ProfilingTypesTest.java     | 177 --------
 .../runtime/testingUtils/TestingCluster.scala   |   4 +-
 .../test/util/ForkableFlinkMiniCluster.scala    |   4 +-
 .../apache/flink/yarn/ApplicationMaster.scala   |  10 +-
 29 files changed, 12 insertions(+), 3430 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/JobManagerProfiler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/JobManagerProfiler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/JobManagerProfiler.java
deleted file mode 100644
index 4c4a20e..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/JobManagerProfiler.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.profiling;
-
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.api.common.JobID;
-
-/**
- * This interface must be implemented by profiling components
- * for the job manager.
- * 
- */
-public interface JobManagerProfiler {
-
-       /**
-        * Registers the given {@link ExecutionGraph} for profiling.
-        * 
-        * @param executionGraph
-        *        the {@link ExecutionGraph} to register for profiling
-        */
-       void registerProfilingJob(ExecutionGraph executionGraph);
-
-       /**
-        * Unregisters the given {@link ExecutionGraph} from profiling. Calling 
this
-        * method will also unregister all of the job's registered listeners.
-        * 
-        * @param executionGraph the {@link ExecutionGraph} to unregister.
-        */
-       void unregisterProfilingJob(ExecutionGraph executionGraph);
-
-       /**
-        * Registers the given {@link ProfilingListener} object to receive
-        * profiling data for the job with the given job ID.
-        * 
-        * @param jobID
-        *        the ID of the job to receive profiling data for
-        * @param profilingListener
-        *        the {@link ProfilingListener} object to register
-        */
-       void registerForProfilingData(JobID jobID, ProfilingListener 
profilingListener);
-
-       /**
-        * Unregisters the given {@link ProfilingListener} object from receiving
-        * profiling data issued by the job manager's profiling component.
-        * 
-        * @param jobID
-        *        the ID of the job the {@link ProfilingListener} object has 
originally been registered for
-        * @param profilingListener
-        *        the {@link ProfilingListener} object to unregister
-        */
-       void unregisterFromProfilingData(JobID jobID, ProfilingListener 
profilingListener);
-
-       /**
-        * Shuts done the job manager's profiling component
-        * and stops all its internal processes.
-        */
-       void shutdown();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/ProfilingException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/ProfilingException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/ProfilingException.java
deleted file mode 100644
index f3148d9..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/ProfilingException.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.profiling;
-
-/**
- * A profiling exception is thrown if an error occur during profiling 
execution.
- * 
- */
-public class ProfilingException extends Exception {
-
-       /**
-        * Generated serialVersionUID.
-        */
-       private static final long serialVersionUID = -3282996556813630561L;
-
-       /**
-        * Constructs a new profiling exception with the given error message.
-        * 
-        * @param errorMsg
-        *        The error message to be included in the exception.
-        */
-       public ProfilingException(String errorMsg) {
-               super(errorMsg);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/ProfilingListener.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/ProfilingListener.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/ProfilingListener.java
deleted file mode 100644
index b3116ed..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/ProfilingListener.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.profiling;
-
-import org.apache.flink.runtime.profiling.types.ProfilingEvent;
-
-public interface ProfilingListener {
-
-       void processProfilingEvents(ProfilingEvent profilingEvent);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/ProfilingUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/ProfilingUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/ProfilingUtils.java
deleted file mode 100644
index 95211be..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/ProfilingUtils.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.profiling;
-
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.net.InetAddress;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
-import org.apache.flink.util.StringUtils;
-
-/**
- * This class contains utility functions to load and configure Nephele's
- * profiling component.
- * 
- */
-public class ProfilingUtils {
-
-       /**
-        * The logging instance used to report problems.
-        */
-       private static final Logger LOG = 
LoggerFactory.getLogger(ProfilingUtils.class);
-
-       /**
-        * The key to check the job manager's profiling component should be 
enabled.
-        */
-       public static final String ENABLE_PROFILING_KEY = 
"jobmanager.profiling.enable";
-
-       /**
-        * The class name of the job manager's profiling component to load if 
progiling is enabled.
-        */
-       public static final String JOBMANAGER_CLASSNAME_KEY = 
"jobmanager.profiling.classname";
-
-       /**
-        * The class name of the task manager's profiling component to load if 
profiling is enabled.
-        */
-       public static final String TASKMANAGER_CLASSNAME_KEY = 
"taskmanager.profiling.classname";
-
-       /**
-        * The key to check whether job profiling should be enabled for a 
specific job.
-        */
-       public static final String PROFILE_JOB_KEY = "job.profiling.enable";
-
-       /**
-        * The key to check the port of the job manager's profiling RPC server.
-        */
-       public static final String JOBMANAGER_RPC_PORT_KEY = 
"jobmanager.profiling.rpc.port";
-
-       /**
-        * The default network port the job manager's profiling component 
starts its RPC server on.
-        */
-       public static final int JOBMANAGER_DEFAULT_RPC_PORT = 6124;
-
-       /**
-        * Key to interval in which a task manager is supposed to send 
profiling data to the job manager.
-        */
-       public static final String TASKMANAGER_REPORTINTERVAL_KEY = 
"taskmanager.profiling.reportinterval";
-
-       /**
-        * Default interval which the task manager uses to report profiling 
data to the job manager.
-        */
-       public static final int DEFAULT_TASKMANAGER_REPORTINTERVAL = 2;
-
-       /**
-        * Creates an instance of the job manager's profiling component.
-        * 
-        * @param profilerClassName
-        *        the class name of the profiling component to load
-        * @param jobManagerBindAddress
-        *        the address the job manager's RPC server is bound to
-        * @return an instance of the job manager profiling component or 
<code>null</code> if an error occurs
-        */
-       @SuppressWarnings("unchecked")
-       public static JobManagerProfiler loadJobManagerProfiler(String 
profilerClassName, InetAddress jobManagerBindAddress) {
-
-               final Class<? extends JobManagerProfiler> profilerClass;
-               try {
-                       profilerClass = (Class<? extends JobManagerProfiler>) 
Class.forName(profilerClassName);
-               } catch (ClassNotFoundException e) {
-                       LOG.error("Cannot find class " + profilerClassName + ": 
" + StringUtils.stringifyException(e));
-                       return null;
-               }
-
-               JobManagerProfiler profiler = null;
-               
-               try {
-                       
-                       final Constructor<JobManagerProfiler> constr = 
(Constructor<JobManagerProfiler>) 
profilerClass.getConstructor(InetAddress.class);
-                       profiler = constr.newInstance(jobManagerBindAddress);
-               
-               } catch(InvocationTargetException e) {
-                       LOG.error("Cannot create profiler: " + 
StringUtils.stringifyException(e));
-                       return null;
-               } catch (NoSuchMethodException e) {
-                       LOG.error("Cannot create profiler: " + 
StringUtils.stringifyException(e));
-                       return null;
-               } catch (InstantiationException e) {
-                       LOG.error("Cannot create profiler: " + 
StringUtils.stringifyException(e));
-                       return null;
-               } catch (IllegalAccessException e) {
-                       LOG.error("Cannot create profiler: " + 
StringUtils.stringifyException(e));
-                       return null;
-               } catch (IllegalArgumentException e) {
-                       LOG.error("Cannot create profiler: " + 
StringUtils.stringifyException(e));
-                       return null;
-               }
-
-               return profiler;
-       }
-
-       /**
-        * Creates an instance of the task manager's profiling component.
-        * 
-        * @param profilerClassName
-        *        the class name of the profiling component to load
-        * @return an instance of the task manager profiling component or 
<code>null</code> if an error occurs
-        */
-       @SuppressWarnings("unchecked")
-       public static TaskManagerProfiler loadTaskManagerProfiler(String 
profilerClassName, InetAddress jobManagerAddress,
-                       InstanceConnectionInfo instanceConnectionInfo) {
-
-               final Class<? extends TaskManagerProfiler> profilerClass;
-               try {
-                       profilerClass = (Class<? extends TaskManagerProfiler>) 
Class.forName(profilerClassName);
-               } catch (ClassNotFoundException e) {
-                       LOG.error("Cannot find class " + profilerClassName + ": 
" + StringUtils.stringifyException(e));
-                       return null;
-               }
-
-               Constructor<? extends TaskManagerProfiler> constructor = null;
-               try {
-                       constructor = 
profilerClass.getConstructor(InetAddress.class, InstanceConnectionInfo.class);
-               } catch (SecurityException e1) {
-                       LOG.error("Security exception while retrieving 
constructor for class " + profilerClass.getCanonicalName()
-                                       + ".", e1);
-                       return null;
-               } catch (NoSuchMethodException e1) {
-                       LOG.error("Class " + profilerClass.getCanonicalName() + 
" does not have a constructor taking a " +
-                                       "InetAddress and InstanceConnectionInfo 
parameter.", e1);
-                       return null;
-               }
-
-               TaskManagerProfiler profiler = null;
-               try {
-                       profiler = constructor.newInstance(jobManagerAddress, 
instanceConnectionInfo);
-               } catch (IllegalArgumentException e) {
-                       LOG.error("IllegalArgumentException while creating 
object of class " + profilerClass.getCanonicalName() +
-                                                       ".", e);
-               } catch (InstantiationException e) {
-                       LOG.error("Could not instantiate object of class " + 
profilerClass.getCanonicalName() + ".",e);
-               } catch (IllegalAccessException e) {
-                       LOG.error("IllegalAccessException while creating object 
of class " + profilerClass.getCanonicalName() + ".",
-                                       e);
-               } catch (InvocationTargetException e) {
-                       LOG.error("InvocationTargetException while creating 
object of class " + profilerClass.getCanonicalName() +
-                                                       ".", e);
-               }
-
-               return profiler;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/TaskManagerProfiler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/TaskManagerProfiler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/TaskManagerProfiler.java
deleted file mode 100644
index 709a05c..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/TaskManagerProfiler.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.profiling;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.taskmanager.Task;
-
-/**
- * This interface must be implemented by profiling components
- * for the task manager manager.
- */
-public interface TaskManagerProfiler {
-
-       /**
-        * Registers a {@link org.apache.flink.runtime.taskmanager.Task} object 
for profiling.
-        * 
-        * @param task
-        *        task to be register a profiling listener for
-        * @param jobConfiguration
-        *        the job configuration sent with the task
-        */
-       void registerTask(Task task, Configuration jobConfiguration);
-
-       /**
-        * Unregisters all previously registered {@link 
org.apache.flink.runtime.taskmanager.Task}
-        * objects for the vertex identified by the given ID.
-        * 
-        * @param id
-        *        the ID of the vertex to unregister the
-        *        {@link org.apache.flink.runtime.taskmanager.Task} objects for
-        */
-       void unregisterTask(ExecutionAttemptID id);
-
-       /**
-        * Shuts done the task manager's profiling component
-        * and stops all its internal processes.
-        */
-       void shutdown();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/EnvironmentThreadSet.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/EnvironmentThreadSet.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/EnvironmentThreadSet.java
deleted file mode 100644
index d5ce137..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/EnvironmentThreadSet.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.profiling.impl;
-
-import java.lang.management.ThreadInfo;
-import java.lang.management.ThreadMXBean;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import 
org.apache.flink.runtime.profiling.impl.types.InternalExecutionVertexThreadProfilingData;
-
-public class EnvironmentThreadSet {
-
-       private static final long NANO_TO_MILLISECONDS = 1000 * 1000;
-
-       private static final long PERCENT = 100;
-
-       
-
-       private final Thread mainThread;
-
-       private final JobVertexID vertexId;
-       
-       private final int subtask;
-       
-       private final ExecutionAttemptID executionId;
-
-       private final Map<Thread, CPUUtilizationSnapshot> userThreads = new 
HashMap<Thread, CPUUtilizationSnapshot>();
-
-       private CPUUtilizationSnapshot mainThreadSnapshot;
-
-       
-       public EnvironmentThreadSet(ThreadMXBean tmx, Thread mainThread, 
JobVertexID vertexId, int subtask, ExecutionAttemptID executionId) {
-               this.mainThread = mainThread;
-               this.vertexId = vertexId;
-               this.subtask = subtask;
-               this.executionId = executionId;
-
-               this.mainThreadSnapshot = createCPUUtilizationSnapshot(tmx, 
mainThread, System.currentTimeMillis());
-       }
-
-       public Thread getMainThread() {
-               return this.mainThread;
-       }
-
-       public void addUserThread(ThreadMXBean tmx, Thread thread) {
-               synchronized (this.userThreads) {
-                       this.userThreads.put(thread, 
createCPUUtilizationSnapshot(tmx, thread, System.currentTimeMillis()));
-               }
-       }
-
-       public void removeUserThread(Thread thread) {
-               synchronized (this.userThreads) {
-                       this.userThreads.remove(thread);
-               }
-       }
-
-       public int getNumberOfUserThreads() {
-               synchronized (this.userThreads) {
-                       return this.userThreads.size();
-               }
-       }
-
-       private CPUUtilizationSnapshot 
createCPUUtilizationSnapshot(ThreadMXBean tmx, Thread thread, long timestamp) {
-               final long threadId = thread.getId();
-
-               final ThreadInfo threadInfo = tmx.getThreadInfo(threadId);
-               if(threadInfo == null) {
-                       return null;
-               }
-
-               return new CPUUtilizationSnapshot(timestamp,
-                               tmx.getThreadCpuTime(threadId) / 
NANO_TO_MILLISECONDS,
-                               tmx.getThreadUserTime(threadId) / 
NANO_TO_MILLISECONDS,
-                               threadInfo.getWaitedTime(),
-                               threadInfo.getBlockedTime());
-       }
-
-       public InternalExecutionVertexThreadProfilingData 
captureCPUUtilization(JobID jobID, ThreadMXBean tmx, long timestamp) {
-
-               synchronized (this.userThreads) {
-
-                       // Calculate utilization for main thread first
-                       final CPUUtilizationSnapshot newMainThreadSnapshot = 
createCPUUtilizationSnapshot(tmx, this.mainThread, timestamp);
-                       if(newMainThreadSnapshot == null) {
-                               return null;
-                       }
-                       
-                       final long mainInterval = 
newMainThreadSnapshot.getTimestamp() - this.mainThreadSnapshot.getTimestamp();
-
-                       if (mainInterval == 0) {
-                               return null;
-                       }
-
-                       long cputime = newMainThreadSnapshot.getTotalCPUTime() 
- this.mainThreadSnapshot.getTotalCPUTime();
-                       long usrtime = 
newMainThreadSnapshot.getTotalCPUUserTime() - 
this.mainThreadSnapshot.getTotalCPUUserTime();
-                       long systime = cputime - usrtime;
-                       long waitime = 
newMainThreadSnapshot.getTotalCPUWaitTime() - 
this.mainThreadSnapshot.getTotalCPUWaitTime();
-                       long blktime = 
newMainThreadSnapshot.getTotalCPUBlockTime()
-                               - 
this.mainThreadSnapshot.getTotalCPUBlockTime();
-
-                       int sumUsrTime = (int) ((usrtime * PERCENT) / 
mainInterval);
-                       int sumSysTime = (int) ((systime * PERCENT) / 
mainInterval);
-                       int sumBlkTime = (int) ((blktime * PERCENT) / 
mainInterval);
-                       int sumWaiTime = (int) ((waitime * PERCENT) / 
mainInterval);
-
-                       // Update snapshot
-                       this.mainThreadSnapshot = newMainThreadSnapshot;
-
-                       if (!this.userThreads.isEmpty()) {
-
-                               final Iterator<Thread> it = 
this.userThreads.keySet().iterator();
-                               int divisor = this.userThreads.size();
-                               while (it.hasNext()) {
-
-                                       final Thread userThread = it.next();
-                                       final CPUUtilizationSnapshot 
newUtilizationSnaphot = createCPUUtilizationSnapshot(tmx, userThread,
-                                               timestamp);
-                                       final CPUUtilizationSnapshot 
oldUtilizationSnapshot = this.userThreads.get(userThread);
-
-                                       long interval = 
newUtilizationSnaphot.getTimestamp() - oldUtilizationSnapshot.getTimestamp();
-
-                                       if (interval == 0) {
-                                               --divisor;
-                                               continue;
-                                       }
-
-                                       cputime = 
newUtilizationSnaphot.getTotalCPUTime() - 
oldUtilizationSnapshot.getTotalCPUTime();
-                                       usrtime = 
newUtilizationSnaphot.getTotalCPUUserTime() - 
oldUtilizationSnapshot.getTotalCPUUserTime();
-                                       systime = cputime - usrtime;
-                                       waitime = 
newUtilizationSnaphot.getTotalCPUWaitTime() - 
oldUtilizationSnapshot.getTotalCPUWaitTime();
-                                       blktime = 
newUtilizationSnaphot.getTotalCPUBlockTime() - 
oldUtilizationSnapshot.getTotalCPUBlockTime();
-
-                                       sumUsrTime += (int) ((usrtime * 
PERCENT) / interval);
-                                       sumSysTime += (int) ((systime * 
PERCENT) / interval);
-                                       sumBlkTime += (int) ((blktime * 
PERCENT) / interval);
-                                       sumWaiTime += (int) ((waitime * 
PERCENT) / interval);
-
-                                       // Update snapshot
-                                       this.userThreads.put(userThread, 
newUtilizationSnaphot);
-                               }
-
-                               sumUsrTime /= (divisor + 1);
-                               sumSysTime /= (divisor + 1);
-                               sumBlkTime /= (divisor + 1);
-                               sumWaiTime /= (divisor + 1);
-                       }
-
-                       return new 
InternalExecutionVertexThreadProfilingData(jobID, this.vertexId, this.subtask, 
this.executionId,
-                                       (int) mainInterval, sumUsrTime, 
sumSysTime, sumBlkTime, sumWaiTime);
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       private class CPUUtilizationSnapshot {
-
-               private final long timestamp;
-
-               private final long totalCPUTime;
-
-               private final long totalCPUUserTime;
-
-               private final long totalCPUWaitTime;
-
-               private final long totalCPUBlockTime;
-
-               public CPUUtilizationSnapshot(long timestamp, long 
totalCPUTime, long totalCPUUserTime, long totalCPUWaitTime,
-                               long totalCPUBlockTime) {
-                       this.timestamp = timestamp;
-                       this.totalCPUTime = totalCPUTime;
-                       this.totalCPUUserTime = totalCPUUserTime;
-                       this.totalCPUWaitTime = totalCPUWaitTime;
-                       this.totalCPUBlockTime = totalCPUBlockTime;
-               }
-
-               public long getTimestamp() {
-                       return this.timestamp;
-               }
-
-               public long getTotalCPUTime() {
-                       return this.totalCPUTime;
-               }
-
-               public long getTotalCPUUserTime() {
-                       return this.totalCPUUserTime;
-               }
-
-               public long getTotalCPUWaitTime() {
-                       return this.totalCPUWaitTime;
-               }
-
-               public long getTotalCPUBlockTime() {
-                       return this.totalCPUBlockTime;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/InstanceProfiler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/InstanceProfiler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/InstanceProfiler.java
deleted file mode 100644
index d973017..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/InstanceProfiler.java
+++ /dev/null
@@ -1,342 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.profiling.impl;
-
-import java.io.BufferedReader;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.flink.runtime.profiling.ProfilingException;
-import 
org.apache.flink.runtime.profiling.impl.types.InternalInstanceProfilingData;
-import org.apache.flink.util.StringUtils;
-
-public class InstanceProfiler {
-
-       static final String PROC_MEMINFO = "/proc/meminfo";
-
-       static final String PROC_STAT = "/proc/stat";
-
-       static final String PROC_NET_DEV = "/proc/net/dev";
-
-       private static final String LOOPBACK_INTERFACE_NAME = "lo";
-
-       private static final Pattern CPU_PATTERN = Pattern
-               
.compile("^cpu\\s+(\\d+)\\s+(\\d+)\\s+(\\d+)\\s+(\\d+)\\s+(\\d+)\\s+(\\d+)\\s+(\\d+).+$");
-
-       private static final Pattern NETWORK_PATTERN = Pattern
-               
.compile("^\\s*(\\w+):\\s*(\\d+)\\s+\\d+\\s+\\d+\\s+\\d+\\s+\\d+\\s+\\d+\\s+\\d+\\s+\\d+\\s+(\\d+).+$");
-
-       private static final Pattern MEMORY_PATTERN = 
Pattern.compile("^\\w+:\\s*(\\d+)\\s+kB$");
-
-       private static final int PERCENT = 100;
-
-       private final String instancePath;
-
-       private long lastTimestamp = 0;
-
-       // CPU related variables
-       private long lastCpuUser = 0;
-
-       private long lastCpuNice = 0;
-
-       private long lastCpuSys = 0;
-
-       private long lastCpuIdle = 0;
-
-       private long lastCpuIOWait = 0;
-
-       private long lastCpuIrq = 0;
-
-       private long lastCpuSoftirq = 0;
-
-       // Network related variables
-       private long lastReceivedBytes = 0;
-
-       private long lastTramsmittedBytes = 0;
-
-       private long firstTimestamp;
-
-       public InstanceProfiler(String instancePath)
-                                                                               
                                                                        throws 
ProfilingException {
-
-               this.instancePath = instancePath;
-               this.firstTimestamp = System.currentTimeMillis();
-               // Initialize counters by calling generateProfilingData once 
and ignore the return value
-               generateProfilingData(this.firstTimestamp);
-       }
-
-       public InternalInstanceProfilingData generateProfilingData(long 
timestamp) throws ProfilingException {
-
-               final long profilingInterval = timestamp - lastTimestamp;
-
-               final InternalInstanceProfilingData profilingData = new 
InternalInstanceProfilingData(
-                       this.instancePath, (int) profilingInterval);
-
-               updateCPUUtilization(profilingData);
-               updateMemoryUtilization(profilingData);
-               updateNetworkUtilization(profilingData);
-
-               // Update timestamp
-               this.lastTimestamp = timestamp;
-
-               return profilingData;
-       }
-
-       private void updateMemoryUtilization(InternalInstanceProfilingData 
profilingData) throws ProfilingException {
-
-               BufferedReader in = null;
-
-               try {
-
-                       in = new BufferedReader(new FileReader(PROC_MEMINFO));
-
-                       long freeMemory = 0;
-                       long totalMemory = 0;
-                       long bufferedMemory = 0;
-                       long cachedMemory = 0;
-                       long cachedSwapMemory = 0;
-
-                       int count = 0;
-                       String output;
-                       while ((output = in.readLine()) != null) {
-
-                               switch (count) {
-                               case 0: // Total memory
-                                       totalMemory = 
extractMemoryValue(output);
-                                       break;
-                               case 1: // Free memory
-                                       freeMemory = extractMemoryValue(output);
-                                       break;
-                               case 2: // Buffers
-                                       bufferedMemory = 
extractMemoryValue(output);
-                                       break;
-                               case 3: // Cache
-                                       cachedMemory = 
extractMemoryValue(output);
-                                       break;
-                               case 4:
-                                       cachedSwapMemory = 
extractMemoryValue(output);
-                                       break;
-                               default:
-                                       break;
-                               }
-
-                               ++count;
-                       }
-
-                       profilingData.setTotalMemory(totalMemory);
-                       profilingData.setFreeMemory(freeMemory);
-                       profilingData.setBufferedMemory(bufferedMemory);
-                       profilingData.setCachedMemory(cachedMemory);
-                       profilingData.setCachedSwapMemory(cachedSwapMemory);
-
-               } catch (IOException ioe) {
-                       throw new ProfilingException("Error while reading 
network utilization: "
-                               + StringUtils.stringifyException(ioe));
-               } finally {
-                       if (in != null) {
-                               try {
-                                       in.close();
-                               } catch (IOException e) {
-                               }
-                       }
-               }
-
-       }
-
-       private long extractMemoryValue(String line) throws ProfilingException {
-
-               final Matcher matcher = MEMORY_PATTERN.matcher(line);
-               if (!matcher.matches()) {
-                       throw new ProfilingException("Cannot extract memory 
data for profiling from line " + line);
-               }
-
-               return Long.parseLong(matcher.group(1));
-       }
-
-       private void updateNetworkUtilization(InternalInstanceProfilingData 
profilingData) throws ProfilingException {
-
-               BufferedReader in = null;
-
-               try {
-
-                       in = new BufferedReader(new FileReader(PROC_NET_DEV));
-
-                       long receivedSum = 0;
-                       long transmittedSum = 0;
-
-                       String output;
-                       while ((output = in.readLine()) != null) {
-                               final Matcher networkMatcher = 
NETWORK_PATTERN.matcher(output);
-                               if (!networkMatcher.matches()) {
-                                       continue;
-                               }
-                               /*
-                                * Extract information according to
-                                * 
http://linuxdevcenter.com/pub/a/linux/2000/11/16/LinuxAdmin.html
-                                */
-
-                               if 
(LOOPBACK_INTERFACE_NAME.equals(networkMatcher.group(1))) {
-                                       continue;
-                               }
-
-                               receivedSum += 
Long.parseLong(networkMatcher.group(2));
-                               transmittedSum += 
Long.parseLong(networkMatcher.group(3));
-                       }
-
-                       in.close();
-                       in = null;
-
-                       profilingData.setReceivedBytes(receivedSum - 
this.lastReceivedBytes);
-                       profilingData.setTransmittedBytes(transmittedSum - 
this.lastTramsmittedBytes);
-
-                       // Store values for next call
-                       this.lastReceivedBytes = receivedSum;
-                       this.lastTramsmittedBytes = transmittedSum;
-
-               } catch (IOException ioe) {
-                       throw new ProfilingException("Error while reading 
network utilization: "
-                               + StringUtils.stringifyException(ioe));
-               } catch (NumberFormatException nfe) {
-                       throw new ProfilingException("Error while reading 
network utilization: "
-                               + StringUtils.stringifyException(nfe));
-               } finally {
-                       if (in != null) {
-                               try {
-                                       in.close();
-                               } catch (IOException e) {
-                               }
-                       }
-               }
-       }
-
-       private void updateCPUUtilization(InternalInstanceProfilingData 
profilingData) throws ProfilingException {
-
-               BufferedReader in = null;
-
-               try {
-
-                       in = new BufferedReader(new FileReader(PROC_STAT));
-                       final String output = in.readLine();
-                       if (output == null) {
-                               throw new ProfilingException("Cannot read CPU 
utilization, return value is null");
-                       }
-
-                       in.close();
-                       in = null;
-
-                       final Matcher cpuMatcher = CPU_PATTERN.matcher(output);
-                       if (!cpuMatcher.matches()) {
-                               throw new ProfilingException("Cannot extract 
CPU utilization from output \"" + output + "\"");
-                       }
-
-                       /*
-                        * Extract the information from the read line according 
to
-                        * http://www.linuxhowtos.org/System/procstat.htm
-                        */
-
-                       final long cpuUser = 
Long.parseLong(cpuMatcher.group(1));
-                       final long cpuNice = 
Long.parseLong(cpuMatcher.group(2));
-                       final long cpuSys = Long.parseLong(cpuMatcher.group(3));
-                       final long cpuIdle = 
Long.parseLong(cpuMatcher.group(4));
-                       final long cpuIOWait = 
Long.parseLong(cpuMatcher.group(5));
-                       final long cpuIrq = Long.parseLong(cpuMatcher.group(6));
-                       final long cpuSoftirq = 
Long.parseLong(cpuMatcher.group(7));
-
-                       // Calculate deltas
-                       final long deltaCpuUser = cpuUser - this.lastCpuUser;
-                       final long deltaCpuNice = cpuNice - this.lastCpuNice;
-                       final long deltaCpuSys = cpuSys - this.lastCpuSys;
-                       final long deltaCpuIdle = cpuIdle - this.lastCpuIdle;
-                       final long deltaCpuIOWait = cpuIOWait - 
this.lastCpuIOWait;
-                       final long deltaCpuIrq = cpuIrq - this.lastCpuIrq;
-                       final long deltaCpuSoftirq = cpuSoftirq - 
this.lastCpuSoftirq;
-                       final long deltaSum = deltaCpuUser + deltaCpuNice + 
deltaCpuSys + deltaCpuIdle + deltaCpuIOWait
-                               + deltaCpuIrq + deltaCpuSoftirq;
-
-                       // TODO: Fix deltaSum = 0 situation
-
-                       // Set the percentage values for the profiling data 
object
-                       /*
-                        * 
profilingData.setIdleCPU((int)((deltaCpuIdle*PERCENT)/deltaSum));
-                        * 
profilingData.setUserCPU((int)((deltaCpuUser*PERCENT)/deltaSum));
-                        * 
profilingData.setSystemCPU((int)((deltaCpuSys*PERCENT)/deltaSum));
-                        * 
profilingData.setIoWaitCPU((int)((deltaCpuIOWait*PERCENT)/deltaSum));
-                        * 
profilingData.setHardIrqCPU((int)((deltaCpuIrq*PERCENT)/deltaSum));
-                        * 
profilingData.setSoftIrqCPU((int)((deltaCpuSoftirq*PERCENT)/deltaSum));
-                        */
-                       // TODO: bad quick fix
-                       if (deltaSum > 0) {
-                               profilingData.setIdleCPU((int) ((deltaCpuIdle * 
PERCENT) / deltaSum));
-                               profilingData.setUserCPU((int) ((deltaCpuUser * 
PERCENT) / deltaSum));
-                               profilingData.setSystemCPU((int) ((deltaCpuSys 
* PERCENT) / deltaSum));
-                               profilingData.setIoWaitCPU((int) 
((deltaCpuIOWait * PERCENT) / deltaSum));
-                               profilingData.setHardIrqCPU((int) ((deltaCpuIrq 
* PERCENT) / deltaSum));
-                               profilingData.setSoftIrqCPU((int) 
((deltaCpuSoftirq * PERCENT) / deltaSum));
-                       } else {
-                               profilingData.setIdleCPU((int) (deltaCpuIdle));
-                               profilingData.setUserCPU((int) (deltaCpuUser));
-                               profilingData.setSystemCPU((int) (deltaCpuSys));
-                               profilingData.setIoWaitCPU((int) 
(deltaCpuIOWait));
-                               profilingData.setHardIrqCPU((int) 
(deltaCpuIrq));
-                               profilingData.setSoftIrqCPU((int) 
(deltaCpuSoftirq));
-                       }
-                       // Store values for next call
-                       this.lastCpuUser = cpuUser;
-                       this.lastCpuNice = cpuNice;
-                       this.lastCpuSys = cpuSys;
-                       this.lastCpuIdle = cpuIdle;
-                       this.lastCpuIOWait = cpuIOWait;
-                       this.lastCpuIrq = cpuIrq;
-                       this.lastCpuSoftirq = cpuSoftirq;
-
-               } catch (IOException ioe) {
-                       throw new ProfilingException("Error while reading CPU 
utilization: " + StringUtils.stringifyException(ioe));
-               } catch (NumberFormatException nfe) {
-                       throw new ProfilingException("Error while reading CPU 
utilization: " + StringUtils.stringifyException(nfe));
-               } finally {
-                       if (in != null) {
-                               try {
-                                       in.close();
-                               } catch (IOException e) {
-                               }
-                       }
-               }
-
-       }
-
-       /**
-        * @return InternalInstanceProfilingData ProfilingData for the instance 
from execution-start to currentTime
-        * @throws ProfilingException
-        */
-       public InternalInstanceProfilingData generateCheckpointProfilingData() 
throws ProfilingException {
-               final long profilingInterval = System.currentTimeMillis() - 
this.firstTimestamp;
-
-               final InternalInstanceProfilingData profilingData = new 
InternalInstanceProfilingData(
-                       this.instancePath, (int) profilingInterval);
-
-               updateCPUUtilization(profilingData);
-               updateMemoryUtilization(profilingData);
-               updateNetworkUtilization(profilingData);
-
-               return profilingData;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/JobProfilingData.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/JobProfilingData.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/JobProfilingData.java
deleted file mode 100644
index ce7db25..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/JobProfilingData.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.profiling.impl;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.SimpleSlot;
-import 
org.apache.flink.runtime.profiling.impl.types.InternalInstanceProfilingData;
-import org.apache.flink.runtime.profiling.types.InstanceSummaryProfilingEvent;
-
-public class JobProfilingData {
-
-       private final ExecutionGraph executionGraph;
-
-       private final long profilingStart;
-
-       private final Map<String, InternalInstanceProfilingData> 
collectedInstanceProfilingData = new
-                       HashMap<String, InternalInstanceProfilingData>();
-
-       
-       public JobProfilingData(ExecutionGraph executionGraph) {
-               this.executionGraph = executionGraph;
-               this.profilingStart = System.currentTimeMillis();
-       }
-
-       
-       public long getProfilingStart() {
-               return this.profilingStart;
-       }
-
-       public ExecutionGraph getExecutionGraph() {
-               return this.executionGraph;
-       }
-
-       public boolean 
addIfInstanceIsAllocatedByJob(InternalInstanceProfilingData 
instanceProfilingData) {
-
-               for (ExecutionVertex executionVertex : 
this.executionGraph.getAllExecutionVertices()) {
-                       SimpleSlot slot = 
executionVertex.getCurrentAssignedResource();
-                       if (slot != null && slot.getInstance().getPath().equals(
-                                       
instanceProfilingData.getInstancePath()))
-                       {
-                               
this.collectedInstanceProfilingData.put(instanceProfilingData.getInstancePath(),
 instanceProfilingData);
-                               return true;
-                       }
-               }
-
-               return false;
-       }
-
-       public InstanceSummaryProfilingEvent 
getInstanceSummaryProfilingData(long timestamp) {
-
-               final Set<Instance> tempSet = new HashSet<Instance>();
-               
-               for (ExecutionVertex executionVertex : 
this.executionGraph.getAllExecutionVertices()) {
-                       SimpleSlot slot = 
executionVertex.getCurrentAssignedResource();
-                       if (slot != null) {
-                               tempSet.add(slot.getInstance());
-                       }
-               }
-
-               // Now compare the size of the collected data set and the 
allocated instance set.
-               // If their sizes are equal we can issue an instance summary.
-               if (tempSet.size() != 
this.collectedInstanceProfilingData.size()) {
-                       return null;
-               }
-
-               return constructInstanceSummary(timestamp);
-       }
-
-       private InstanceSummaryProfilingEvent constructInstanceSummary(long 
timestamp) {
-
-               final int numberOfInstances = 
this.collectedInstanceProfilingData.size();
-               
-               final Iterator<String> instanceIterator = 
this.collectedInstanceProfilingData.keySet().iterator();
-
-               long freeMemorySum = 0;
-               long totalMemorySum = 0;
-               long bufferedMemorySum = 0;
-               long cachedMemorySum = 0;
-               long cachedSwapMemorySum = 0;
-
-               int ioWaitCPUSum = 0;
-               int idleCPUSum = 0;
-               int profilingIntervalSum = 0;
-               int systemCPUSum = 0;
-               int hardIrqCPUSum = 0;
-               int softIrqCPUSum = 0;
-
-               int userCPUSum = 0;
-               long receivedBytesSum = 0;
-               long transmittedBytesSum = 0;
-
-               // Sum up the individual values
-               while (instanceIterator.hasNext()) {
-
-                       final InternalInstanceProfilingData profilingData = 
this.collectedInstanceProfilingData.get(instanceIterator.next());
-
-                       freeMemorySum += profilingData.getFreeMemory();
-                       ioWaitCPUSum += profilingData.getIOWaitCPU();
-                       idleCPUSum += profilingData.getIdleCPU();
-                       profilingIntervalSum += 
profilingData.getProfilingInterval();
-                       systemCPUSum += profilingData.getSystemCPU();
-                       hardIrqCPUSum += profilingData.getHardIrqCPU();
-                       softIrqCPUSum += profilingData.getSoftIrqCPU();
-                       totalMemorySum += profilingData.getTotalMemory();
-                       userCPUSum += profilingData.getUserCPU();
-                       receivedBytesSum += profilingData.getReceivedBytes();
-                       transmittedBytesSum += 
profilingData.getTransmittedBytes();
-                       bufferedMemorySum += profilingData.getBufferedMemory();
-                       cachedMemorySum += profilingData.getCachedMemory();
-                       cachedSwapMemorySum += 
profilingData.getCachedSwapMemory();
-               }
-
-               final InstanceSummaryProfilingEvent instanceSummary = new 
InstanceSummaryProfilingEvent(
-                               profilingIntervalSum / numberOfInstances,
-                               ioWaitCPUSum / numberOfInstances,
-                               idleCPUSum / numberOfInstances,
-                               userCPUSum / numberOfInstances,
-                               systemCPUSum / numberOfInstances,
-                               hardIrqCPUSum / numberOfInstances,
-                               softIrqCPUSum / numberOfInstances,
-                               totalMemorySum / numberOfInstances,
-                               freeMemorySum / numberOfInstances,
-                               bufferedMemorySum / numberOfInstances,
-                               cachedMemorySum / numberOfInstances,
-                               cachedSwapMemorySum / numberOfInstances,
-                               receivedBytesSum / numberOfInstances,
-                               transmittedBytesSum / numberOfInstances,
-                               this.executionGraph.getJobID(), timestamp, 
(timestamp - this.profilingStart));
-
-               this.collectedInstanceProfilingData.clear();
-
-               return instanceSummary;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalExecutionVertexProfilingData.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalExecutionVertexProfilingData.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalExecutionVertexProfilingData.java
deleted file mode 100644
index 98ccfcf..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalExecutionVertexProfilingData.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.profiling.impl.types;
-
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-
-public abstract class InternalExecutionVertexProfilingData implements 
InternalProfilingData {
-
-       private final JobID jobId;
-       
-       private final JobVertexID vertexId;
-       
-       private int subtask;
-       
-       private final ExecutionAttemptID executionId;
-
-       
-       public InternalExecutionVertexProfilingData() {
-               this.jobId = new JobID();
-               this.vertexId = new JobVertexID();
-               this.executionId = new ExecutionAttemptID();
-               this.subtask = -1;
-       }
-
-       
-       public InternalExecutionVertexProfilingData(JobID jobId, JobVertexID 
vertexId, int subtask, ExecutionAttemptID executionId) {
-               this.jobId = jobId;
-               this.vertexId = vertexId;
-               this.subtask = subtask;
-               this.executionId = executionId;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       
-       public JobID getJobID() {
-               return this.jobId;
-       }
-       
-       public JobVertexID getVertexId() {
-               return vertexId;
-       }
-       
-       public int getSubtask() {
-               return subtask;
-       }
-       
-       public ExecutionAttemptID getExecutionAttemptId() {
-               return this.executionId;
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public void read(DataInputView in) throws IOException {
-               this.jobId.read(in);
-               this.vertexId.read(in);
-               this.executionId.read(in);
-               this.subtask = in.readInt();
-       }
-
-       @Override
-       public void write(DataOutputView out) throws IOException {
-               this.jobId.write(out);
-               this.vertexId.write(out);
-               this.executionId.write(out);
-               out.writeInt(this.subtask);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalExecutionVertexThreadProfilingData.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalExecutionVertexThreadProfilingData.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalExecutionVertexThreadProfilingData.java
deleted file mode 100644
index 92ce916..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalExecutionVertexThreadProfilingData.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.profiling.impl.types;
-
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-
-public class InternalExecutionVertexThreadProfilingData extends 
InternalExecutionVertexProfilingData {
-
-       private int profilingInterval = 0;
-
-       private int userTime = 0;
-
-       private int systemTime = 0;
-
-       private int blockedTime = 0;
-
-       private int waitedTime = 0;
-
-       public InternalExecutionVertexThreadProfilingData(JobID jobID, 
JobVertexID vertexId, int subtask, ExecutionAttemptID executionId,
-                       int profilingInterval, int userTime, int systemTime, 
int blockedTime, int waitedTime)
-       {
-               super(jobID, vertexId, subtask, executionId);
-
-               this.profilingInterval = profilingInterval;
-               this.userTime = userTime;
-               this.systemTime = systemTime;
-               this.blockedTime = blockedTime;
-               this.waitedTime = waitedTime;
-       }
-
-       public InternalExecutionVertexThreadProfilingData() {}
-
-       @Override
-       public void read(DataInputView in) throws IOException {
-               super.read(in);
-
-               this.profilingInterval = in.readInt();
-               this.userTime = in.readInt();
-               this.systemTime = in.readInt();
-               this.blockedTime = in.readInt();
-               this.waitedTime = in.readInt();
-       }
-
-       @Override
-       public void write(DataOutputView out) throws IOException {
-               super.write(out);
-
-               out.writeInt(this.profilingInterval);
-               out.writeInt(this.userTime);
-               out.writeInt(this.systemTime);
-               out.writeInt(this.blockedTime);
-               out.writeInt(this.waitedTime);
-       }
-
-       public int getBlockedTime() {
-               return this.blockedTime;
-       }
-
-       public int getProfilingInterval() {
-               return this.profilingInterval;
-       }
-
-       public int getSystemTime() {
-               return this.systemTime;
-       }
-
-       public int getUserTime() {
-               return this.userTime;
-       }
-
-       public int getWaitedTime() {
-               return this.waitedTime;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalInstanceProfilingData.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalInstanceProfilingData.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalInstanceProfilingData.java
deleted file mode 100644
index 145d11c..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalInstanceProfilingData.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.profiling.impl.types;
-
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-public class InternalInstanceProfilingData implements InternalProfilingData {
-
-       private String instancePath;
-
-       private int profilingInterval;
-
-       private int ioWaitCPU;
-
-       private int idleCPU;
-
-       private int userCPU;
-
-       private int systemCPU;
-
-       private int hardIrqCPU;
-
-       private int softIrqCPU;
-
-       private long totalMemory;
-
-       private long freeMemory;
-
-       private long bufferedMemory;
-
-       private long cachedMemory;
-
-       private long cachedSwapMemory;
-
-       private long receivedBytes;
-
-       private long transmittedBytes;
-
-       public InternalInstanceProfilingData() {
-               this.freeMemory = -1;
-               this.ioWaitCPU = -1;
-               this.idleCPU = -1;
-               this.instancePath = "";
-               this.profilingInterval = -1;
-               this.systemCPU = -1;
-               this.totalMemory = -1;
-               this.bufferedMemory = -1;
-               this.cachedMemory = -1;
-               this.cachedSwapMemory = -1;
-               this.userCPU = -1;
-               this.receivedBytes = -1;
-               this.transmittedBytes = -1;
-       }
-
-       public InternalInstanceProfilingData(String instancePath, int 
profilingInterval) {
-
-               this.instancePath = instancePath;
-               this.profilingInterval = profilingInterval;
-               this.freeMemory = -1;
-               this.ioWaitCPU = -1;
-               this.idleCPU = -1;
-               this.systemCPU = -1;
-               this.totalMemory = -1;
-               this.bufferedMemory = -1;
-               this.cachedMemory = -1;
-               this.cachedSwapMemory = -1;
-               this.userCPU = -1;
-               this.receivedBytes = -1;
-               this.transmittedBytes = -1;
-       }
-
-       public long getFreeMemory() {
-               return this.freeMemory;
-       }
-
-       public int getIOWaitCPU() {
-               return this.ioWaitCPU;
-       }
-
-       public int getIdleCPU() {
-               return this.idleCPU;
-       }
-
-       public int getHardIrqCPU() {
-               return this.hardIrqCPU;
-       }
-
-       public int getSoftIrqCPU() {
-               return this.softIrqCPU;
-       }
-
-       public String getInstancePath() {
-               return this.instancePath;
-       }
-
-       public int getProfilingInterval() {
-               return this.profilingInterval;
-       }
-
-       public int getSystemCPU() {
-               return this.systemCPU;
-       }
-
-       public long getTotalMemory() {
-               return this.totalMemory;
-       }
-
-       public long getBufferedMemory() {
-               return this.bufferedMemory;
-       }
-
-       public long getCachedMemory() {
-               return this.cachedMemory;
-       }
-
-       public long getCachedSwapMemory() {
-               return this.cachedSwapMemory;
-       }
-
-       public int getUserCPU() {
-               return this.userCPU;
-       }
-
-       public long getReceivedBytes() {
-               return this.receivedBytes;
-       }
-
-       public long getTransmittedBytes() {
-               return this.transmittedBytes;
-       }
-
-       @Override
-       public void read(DataInputView in) throws IOException {
-
-               this.freeMemory = in.readLong();
-               this.ioWaitCPU = in.readInt();
-               this.idleCPU = in.readInt();
-               this.instancePath = in.readUTF();
-               this.profilingInterval = in.readInt();
-               this.systemCPU = in.readInt();
-               this.totalMemory = in.readLong();
-               this.bufferedMemory = in.readLong();
-               this.cachedMemory = in.readLong();
-               this.cachedSwapMemory = in.readLong();
-               this.userCPU = in.readInt();
-               this.receivedBytes = in.readLong();
-               this.transmittedBytes = in.readLong();
-               this.hardIrqCPU = in.readInt();
-               this.softIrqCPU = in.readInt();
-
-       }
-
-       @Override
-       public void write(DataOutputView out) throws IOException {
-
-               out.writeLong(this.freeMemory);
-               out.writeInt(this.ioWaitCPU);
-               out.writeInt(this.idleCPU);
-               out.writeUTF(instancePath);
-               out.writeInt(this.profilingInterval);
-               out.writeInt(this.systemCPU);
-               out.writeLong(this.totalMemory);
-               out.writeLong(this.bufferedMemory);
-               out.writeLong(this.cachedMemory);
-               out.writeLong(this.cachedSwapMemory);
-               out.writeInt(this.userCPU);
-               out.writeLong(this.receivedBytes);
-               out.writeLong(this.transmittedBytes);
-               out.writeInt(this.hardIrqCPU);
-               out.writeInt(this.softIrqCPU);
-
-       }
-
-       public void setFreeMemory(long freeMemory) {
-               this.freeMemory = freeMemory;
-       }
-
-       public void setIoWaitCPU(int ioWaitCPU) {
-               this.ioWaitCPU = ioWaitCPU;
-       }
-
-       public void setIdleCPU(int idleCPU) {
-               this.idleCPU = idleCPU;
-       }
-
-       public void setSystemCPU(int systemCPU) {
-               this.systemCPU = systemCPU;
-       }
-
-       public void setHardIrqCPU(int hardIrqCPU) {
-               this.hardIrqCPU = hardIrqCPU;
-       }
-
-       public void setSoftIrqCPU(int softIrqCPU) {
-               this.softIrqCPU = softIrqCPU;
-       }
-
-       public void setTotalMemory(long totalMemory) {
-               this.totalMemory = totalMemory;
-       }
-
-       public void setBufferedMemory(long bufferedMemory) {
-               this.bufferedMemory = bufferedMemory;
-       }
-
-       public void setCachedMemory(long cachedMemory) {
-               this.cachedMemory = cachedMemory;
-       }
-
-       public void setCachedSwapMemory(long cachedSwapMemory) {
-               this.cachedSwapMemory = cachedSwapMemory;
-       }
-
-       public void setUserCPU(int userCPU) {
-               this.userCPU = userCPU;
-       }
-
-       public void setReceivedBytes(long receivedBytes) {
-               this.receivedBytes = receivedBytes;
-       }
-
-       public void setTransmittedBytes(long transmittedBytes) {
-               this.transmittedBytes = transmittedBytes;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalProfilingData.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalProfilingData.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalProfilingData.java
deleted file mode 100644
index c6f53d1..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalProfilingData.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.profiling.impl.types;
-
-import org.apache.flink.core.io.IOReadableWritable;
-
-public interface InternalProfilingData extends IOReadableWritable {}

http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/ProfilingDataContainer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/ProfilingDataContainer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/ProfilingDataContainer.java
deleted file mode 100644
index b233b84..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/ProfilingDataContainer.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.profiling.impl.types;
-
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.Iterator;
-import java.util.Queue;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.StringUtils;
-
-public class ProfilingDataContainer implements IOReadableWritable {
-
-       private final Queue<InternalProfilingData> queuedProfilingData = new 
ArrayDeque<InternalProfilingData>();
-
-       public void addProfilingData(InternalProfilingData profilingData) {
-
-               if (profilingData == null) {
-                       return;
-               }
-
-               this.queuedProfilingData.add(profilingData);
-       }
-
-       public void clear() {
-               this.queuedProfilingData.clear();
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public void read(DataInputView in) throws IOException {
-
-               final int numberOfRecords = in.readInt();
-               for (int i = 0; i < numberOfRecords; i++) {
-                       final String className = 
StringUtils.readNullableString(in);
-
-                       Class<? extends InternalProfilingData> clazz;
-                       try {
-                               clazz = (Class<? extends 
InternalProfilingData>) Class.forName(className);
-                       } catch (Exception e) {
-                               throw new IOException(e);
-                       }
-
-                       InternalProfilingData profilingData ;
-                       try {
-                               profilingData = clazz.newInstance();
-                       } catch (Exception e) {
-                               throw new IOException(e);
-                       }
-
-                       // Restore internal state
-                       profilingData.read(in);
-
-                       this.queuedProfilingData.add(profilingData);
-               }
-       }
-
-       public int size() {
-               return this.queuedProfilingData.size();
-       }
-
-       public boolean isEmpty() {
-               return this.queuedProfilingData.isEmpty();
-       }
-
-       public Iterator<InternalProfilingData> getIterator() {
-               return this.queuedProfilingData.iterator();
-       }
-
-       @Override
-       public void write(DataOutputView out) throws IOException {
-
-               // Write the number of records
-               out.writeInt(this.queuedProfilingData.size());
-               // Write the records themselves
-               for (InternalProfilingData profilingData : 
this.queuedProfilingData) {
-                       
StringUtils.writeNullableString(profilingData.getClass().getName(), out);
-                       profilingData.write(out);
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/InstanceProfilingEvent.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/InstanceProfilingEvent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/InstanceProfilingEvent.java
deleted file mode 100644
index 7de9252..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/InstanceProfilingEvent.java
+++ /dev/null
@@ -1,399 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.profiling.types;
-
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.api.common.JobID;
-
-/**
- * Instance profiling events are a special subclass of profiling events. They 
contain profiling information about the
- * utilization of a particular instance during a job execution.
- */
-public abstract class InstanceProfilingEvent extends ProfilingEvent {
-
-       private static final long serialVersionUID = 5964092674722506040L;
-
-       /**
-        * The interval of time this profiling event covers in milliseconds.
-        */
-       private int profilingInterval;
-
-       /**
-        * The percentage of time the CPU(s) spent in state IOWAIT during the 
profiling interval.
-        */
-       private int ioWaitCPU;
-
-       /**
-        * The percentage of time the CPU(s) spent in state IDLE during the 
profiling interval.
-        */
-       private int idleCPU;
-
-       /**
-        * The percentage of time the CPU(s) spent in state USER during the 
profiling interval.
-        */
-       private int userCPU;
-
-       /**
-        * The percentage of time the CPU(s) spent in state SYSTEM during the 
profiling interval.
-        */
-       private int systemCPU;
-
-       /**
-        * The percentage of time the CPU(s) spent in state HARD_IRQ during the 
profiling interval.
-        */
-       private int hardIrqCPU;
-
-       /**
-        * The percentage of time the CPU(s) spent in state SOFT_IRQ during the 
profiling interval.
-        */
-       private int softIrqCPU;
-
-       /**
-        * The total amount of this instance's main memory in bytes.
-        */
-       private long totalMemory;
-
-       /**
-        * The free amount of this instance's main memory in bytes.
-        */
-       private long freeMemory;
-
-       /**
-        * The amount of main memory the instance uses for file buffers.
-        */
-       private long bufferedMemory;
-
-       /**
-        * The amount of main memory the instance uses as cache memory.
-        */
-       private long cachedMemory;
-
-       /**
-        * The amount of main memory the instance uses for cached swaps.
-        */
-       private long cachedSwapMemory;
-
-       /**
-        * The number of bytes received via network during the profiling 
interval.
-        */
-       private long receivedBytes;
-
-       /**
-        * The number of bytes transmitted via network during the profiling 
interval.
-        */
-       private long transmittedBytes;
-
-       /**
-        * Constructs a new instance profiling event.
-        * 
-        * @param profilingInterval
-        *        the interval of time this profiling event covers in 
milliseconds
-        * @param ioWaitCPU
-        *        the percentage of time the CPU(s) spent in state IOWAIT 
during the profiling interval
-        * @param idleCPU
-        *        the percentage of time the CPU(s) spent in state IDLE during 
the profiling interval
-        * @param userCPU
-        *        the percentage of time the CPU(s) spent in state USER during 
the profiling interval
-        * @param systemCPU
-        *        the percentage of time the CPU(s) spent in state SYSTEM 
during the profiling interval
-        * @param hardIrqCPU
-        *        the percentage of time the CPU(s) spent in state HARD_IRQ 
during the profiling interval
-        * @param softIrqCPU
-        *        the percentage of time the CPU(s) spent in state SOFT_IRQ 
during the profiling interval
-        * @param totalMemory
-        *        the total amount of this instance's main memory in bytes
-        * @param freeMemory
-        *        the free amount of this instance's main memory in bytes
-        * @param bufferedMemory
-        *        the amount of main memory the instance uses for file buffers
-        * @param cachedMemory
-        *        the amount of main memory the instance uses as cache memory
-        * @param cachedSwapMemory
-        *        The amount of main memory the instance uses for cached swaps
-        * @param receivedBytes
-        *        the number of bytes received via network during the profiling 
interval
-        * @param transmittedBytes
-        *        the number of bytes transmitted via network during the 
profiling interval
-        * @param jobID
-        *        the ID of this job this profiling event belongs to
-        * @param timestamp
-        *        the time stamp of this profiling event's creation
-        * @param profilingTimestamp
-        *        the time stamp relative to the beginning of the job's 
execution
-        */
-       public InstanceProfilingEvent(final int profilingInterval, final int 
ioWaitCPU, final int idleCPU,
-                       final int userCPU, final int systemCPU, final int 
hardIrqCPU, final int softIrqCPU, final long totalMemory,
-                       final long freeMemory, final long bufferedMemory, final 
long cachedMemory, final long cachedSwapMemory,
-                       final long receivedBytes, final long transmittedBytes, 
final JobID jobID, final long timestamp,
-                       final long profilingTimestamp) {
-
-               super(jobID, timestamp, profilingTimestamp);
-
-               this.profilingInterval = profilingInterval;
-
-               this.ioWaitCPU = ioWaitCPU;
-               this.idleCPU = idleCPU;
-               this.userCPU = userCPU;
-               this.systemCPU = systemCPU;
-               this.hardIrqCPU = hardIrqCPU;
-               this.softIrqCPU = softIrqCPU;
-
-               this.totalMemory = totalMemory;
-               this.freeMemory = freeMemory;
-               this.bufferedMemory = bufferedMemory;
-               this.cachedMemory = cachedMemory;
-               this.cachedSwapMemory = cachedSwapMemory;
-
-               this.receivedBytes = receivedBytes;
-               this.transmittedBytes = transmittedBytes;
-       }
-
-       /**
-        * Default constructor for serialization/deserialization.
-        */
-       public InstanceProfilingEvent() {
-               super();
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       
-       /**
-        * Returns the interval of time this profiling event covers in 
milliseconds.
-        * 
-        * @return the interval of time this profiling event covers in 
milliseconds
-        */
-       public final int getProfilingInterval() {
-               return this.profilingInterval;
-       }
-
-       /**
-        * Returns the total amount of memory of the corresponding instance.
-        * 
-        * @return the total amount of memory in bytes
-        */
-       public final long getTotalMemory() {
-               return this.totalMemory;
-       }
-
-       /**
-        * Returns the amount of free memory of the corresponding instance.
-        * 
-        * @return the amount of free memory in bytes.
-        */
-       public final long getFreeMemory() {
-               return this.freeMemory;
-       }
-
-       /**
-        * Returns the amount of memory, in bytes, used for file buffers.
-        * 
-        * @return the amount of memory used for file buffers in bytes
-        */
-       public final long getBufferedMemory() {
-               return this.bufferedMemory;
-       }
-
-       /**
-        * Returns the amount of memory, in bytes, used as cache memory.
-        * 
-        * @return the amount of memory used as cache memory in bytes
-        */
-       public final long getCachedMemory() {
-               return this.cachedMemory;
-       }
-
-       /**
-        * Returns the amount of swap, in bytes, used as cache memory.
-        * 
-        * @return the amount of, in bytes, used as cache memory
-        */
-       public final long getCachedSwapMemory() {
-               return this.cachedSwapMemory;
-       }
-
-       /**
-        * Returns the percentage of time the CPU(s) spent in state USER during 
the profiling interval.
-        * 
-        * @return the percentage of time the CPU(s) spent in state USER during 
the profiling interval
-        */
-       public final int getUserCPU() {
-               return this.userCPU;
-       }
-
-       /**
-        * Returns the percentage of time the CPU(s) spent in state SYSTEM 
during the profiling interval.
-        * 
-        * @return the percentage of time the CPU(s) spent in state SYSTEM 
during the profiling interval
-        */
-       public final int getSystemCPU() {
-               return this.systemCPU;
-       }
-
-       /**
-        * Returns the percentage of time the CPU(s) spent in state IDLE during 
the profiling interval. Prior to Linux
-        * 2.5.41, this includes IO-wait time.
-        * 
-        * @return the percentage of time the CPU(s) spent in state IDLE during 
the profiling interval
-        */
-       public final int getIdleCPU() {
-               return this.idleCPU;
-       }
-
-       /**
-        * Returns the percentage of time the CPU(s) spent in state IOWAIT 
during the profiling interval. Prior to Linux
-        * 2.5.41, included in idle.
-        * 
-        * @return the percentage of time the CPU(s) spent in state IOWAIT 
during the profiling interval.
-        */
-       public final int getIOWaitCPU() {
-               return this.ioWaitCPU;
-       }
-
-       /**
-        * Returns the percentage of time the CPU(s) spent in state HARD_IRQ 
during the profiling interval.
-        * 
-        * @return the percentage of time the CPU(s) spent in state HARD_IRQ 
during the profiling interval
-        */
-       public final int getHardIrqCPU() {
-               return this.hardIrqCPU;
-       }
-
-       /**
-        * Returns the percentage of time the CPU(s) spent in state SOFT_IRQ 
during the profiling interval.
-        * 
-        * @return the percentage of time the CPU(s) spent in state SOFT_IRQ 
during the profiling interval
-        */
-       public final int getSoftIrqCPU() {
-               return this.softIrqCPU;
-       }
-
-       /**
-        * Returns the number of bytes received via network during the 
profiling interval.
-        * 
-        * @return the number of bytes received via network during the 
profiling interval
-        */
-       public final long getReceivedBytes() {
-               return this.receivedBytes;
-       }
-
-       /**
-        * Returns the number of bytes transmitted via network during the 
profiling interval.
-        * 
-        * @return the number of bytes transmitted via network during the 
profiling interval
-        */
-       public final long getTransmittedBytes() {
-               return this.transmittedBytes;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Serialization
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public void read(final DataInputView in) throws IOException {
-               super.read(in);
-
-               this.profilingInterval = in.readInt();
-
-               this.ioWaitCPU = in.readInt();
-               this.idleCPU = in.readInt();
-               this.userCPU = in.readInt();
-               this.systemCPU = in.readInt();
-               this.hardIrqCPU = in.readInt();
-               this.softIrqCPU = in.readInt();
-
-               this.totalMemory = in.readLong();
-               this.freeMemory = in.readLong();
-               this.bufferedMemory = in.readLong();
-               this.cachedMemory = in.readLong();
-               this.cachedSwapMemory = in.readLong();
-
-               this.receivedBytes = in.readLong();
-               this.transmittedBytes = in.readLong();
-       }
-
-
-       @Override
-       public void write(final DataOutputView out) throws IOException {
-               super.write(out);
-
-               out.writeInt(this.profilingInterval);
-
-               out.writeInt(this.ioWaitCPU);
-               out.writeInt(this.idleCPU);
-               out.writeInt(this.userCPU);
-               out.writeInt(this.systemCPU);
-               out.writeInt(this.hardIrqCPU);
-               out.writeInt(this.softIrqCPU);
-
-               out.writeLong(totalMemory);
-               out.writeLong(freeMemory);
-               out.writeLong(bufferedMemory);
-               out.writeLong(cachedMemory);
-               out.writeLong(cachedSwapMemory);
-
-               out.writeLong(receivedBytes);
-               out.writeLong(transmittedBytes);
-       }
-
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Utilities
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public boolean equals(final Object obj) {
-               if (obj instanceof InstanceProfilingEvent) {
-                       final InstanceProfilingEvent other = 
(InstanceProfilingEvent) obj;
-                       
-                       return super.equals(obj) &&
-                                       profilingInterval == 
other.profilingInterval &&
-                                       ioWaitCPU == other.ioWaitCPU &&
-                                       idleCPU == other.idleCPU &&
-                                       userCPU == other.userCPU &&
-                                       systemCPU == other.systemCPU &&
-                                       hardIrqCPU == other.hardIrqCPU &&
-                                       softIrqCPU == other.softIrqCPU &&
-                                       totalMemory == other.totalMemory &&
-                                       freeMemory == other.freeMemory  &&
-                                       bufferedMemory == other.bufferedMemory 
&&
-                                       cachedMemory == other.cachedMemory &&
-                                       cachedSwapMemory == 
other.cachedSwapMemory &&
-                                       receivedBytes == other.receivedBytes &&
-                                       transmittedBytes == 
other.transmittedBytes;
-               }
-               else {
-                       return false;
-               }
-       }
-
-
-       @Override
-       public int hashCode() {
-               long hashCode = getJobID().hashCode() + getTimestamp() + 
getProfilingTimestamp();
-               hashCode += this.profilingInterval + this.ioWaitCPU + 
this.idleCPU + this.userCPU + this.systemCPU
-                       + this.hardIrqCPU + this.softIrqCPU;
-               hashCode += (this.totalMemory + this.freeMemory + 
this.bufferedMemory + this.cachedMemory + this.cachedSwapMemory);
-               hashCode ^= hashCode >>> 32;
-               return (int) hashCode;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/InstanceSummaryProfilingEvent.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/InstanceSummaryProfilingEvent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/InstanceSummaryProfilingEvent.java
deleted file mode 100644
index 4bb052d..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/InstanceSummaryProfilingEvent.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.profiling.types;
-
-import org.apache.flink.api.common.JobID;
-
-/**
- * Instance summary profiling events summarize the profiling events of all 
instances involved in computing a job.
- */
-public final class InstanceSummaryProfilingEvent extends 
InstanceProfilingEvent {
-
-       private static final long serialVersionUID = 1L;
-
-       /**
-        * Constructs a new instance summary profiling event.
-        * 
-        * @param profilingInterval
-        *        the interval of time this profiling event covers in 
milliseconds
-        * @param ioWaitCPU
-        *        the percentage of time the CPU(s) spent in state IOWAIT 
during the profiling interval
-        * @param idleCPU
-        *        the percentage of time the CPU(s) spent in state IDLE during 
the profiling interval
-        * @param userCPU
-        *        the percentage of time the CPU(s) spent in state USER during 
the profiling interval
-        * @param systemCPU
-        *        the percentage of time the CPU(s) spent in state SYSTEM 
during the profiling interval
-        * @param hardIrqCPU
-        *        the percentage of time the CPU(s) spent in state HARD_IRQ 
during the profiling interval
-        * @param softIrqCPU
-        *        the percentage of time the CPU(s) spent in state SOFT_IRQ 
during the profiling interval
-        * @param totalMemory
-        *        the total amount of this instance's main memory in bytes
-        * @param freeMemory
-        *        the free amount of this instance's main memory in bytes
-        * @param bufferedMemory
-        *        the amount of main memory the instance uses for file buffers
-        * @param cachedMemory
-        *        the amount of main memory the instance uses as cache memory
-        * @param cachedSwapMemory
-        *        The amount of main memory the instance uses for cached swaps
-        * @param receivedBytes
-        *        the number of bytes received via network during the profiling 
interval
-        * @param transmittedBytes
-        *        the number of bytes transmitted via network during the 
profiling interval
-        * @param jobID
-        *        the ID of this job this profiling event belongs to
-        * @param timestamp
-        *        the time stamp of this profiling event's creation
-        * @param profilingTimestamp
-        *        the time stamp relative to the beginning of the job's 
execution
-        */
-       public InstanceSummaryProfilingEvent(final int profilingInterval, final 
int ioWaitCPU, final int idleCPU,
-                       final int userCPU, final int systemCPU, final int 
hardIrqCPU, final int softIrqCPU, final long totalMemory,
-                       final long freeMemory, final long bufferedMemory, final 
long cachedMemory, final long cachedSwapMemory,
-                       final long receivedBytes, final long transmittedBytes, 
final JobID jobID,
-                       final long timestamp, final long profilingTimestamp) {
-               super(profilingInterval, ioWaitCPU, idleCPU, userCPU, 
systemCPU, hardIrqCPU, softIrqCPU, totalMemory,
-                       freeMemory, bufferedMemory, cachedMemory, 
cachedSwapMemory, receivedBytes, transmittedBytes, jobID,
-                       timestamp, profilingTimestamp);
-       }
-
-       /**
-        * Default constructor for serialization/deserialization.
-        */
-       public InstanceSummaryProfilingEvent() {
-               super();
-       }
-
-       @Override
-       public boolean equals(Object obj) {
-               return (obj instanceof InstanceSummaryProfilingEvent) && 
super.equals(obj);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/ProfilingEvent.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/ProfilingEvent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/ProfilingEvent.java
deleted file mode 100644
index 13a9a99..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/ProfilingEvent.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.profiling.types;
-
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.event.job.AbstractEvent;
-import org.apache.flink.api.common.JobID;
-
-import com.google.common.base.Preconditions;
-
-/**
- * A profiling event is a special type of event. It is intended to transport 
profiling data of a Nephele job to external
- * components.
- */
-public abstract class ProfilingEvent extends AbstractEvent {
-
-       private static final long serialVersionUID = 1L;
-
-       /** The ID of the job the profiling data belongs to. */
-       private final JobID jobID;
-
-       /** The profiling time stamp. */
-       private long profilingTimestamp;
-
-       /**
-        * Constructs a new profiling event.
-        * 
-        * @param jobID
-        *        the ID of the job this profiling events belongs to
-        * @param timestamp
-        *        the time stamp of the event
-        * @param profilingTimestamp
-        *        the time stamp of the profiling data
-        */
-       public ProfilingEvent(JobID jobID, long timestamp, long 
profilingTimestamp) {
-               super(timestamp);
-
-               Preconditions.checkNotNull(jobID);
-               this.jobID = jobID;
-               this.profilingTimestamp = profilingTimestamp;
-       }
-
-       /**
-        * Default constructor for serialization/deserialization.
-        */
-       public ProfilingEvent() {
-               super();
-               this.jobID = new JobID();
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       
-       /**
-        * Returns the ID of the job this profiling information belongs to.
-        * 
-        * @return the ID of the job this profiling information belongs to
-        */
-       public JobID getJobID() {
-               return this.jobID;
-       }
-
-       /**
-        * Returns the timestamp of this profiling information. The timestamp 
denotes
-        * the time period in milliseconds between the creation of this 
profiling information
-        * and the start of the corresponding vertex's execution.
-        * 
-        * @return the timestamp of this profiling information.
-        */
-       public long getProfilingTimestamp() {
-               return this.profilingTimestamp;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Serialization
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public void read(DataInputView in) throws IOException {
-               super.read(in);
-
-               this.jobID.read(in);
-               this.profilingTimestamp = in.readLong();
-       }
-
-
-       @Override
-       public void write(DataOutputView out) throws IOException {
-               super.write(out);
-
-               this.jobID.write(out);
-               out.writeLong(this.profilingTimestamp);
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       // Utilities
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public boolean equals(Object obj) {
-               if (obj instanceof ProfilingEvent) {
-                       final ProfilingEvent other = (ProfilingEvent) obj;
-                       
-                       return super.equals(obj) && this.profilingTimestamp == 
other.profilingTimestamp &&
-                                       this.jobID.equals(other.jobID);
-               }
-               else {
-                       return false;
-               }
-       }
-
-       @Override
-       public int hashCode() {
-               return this.jobID.hashCode() ^ ((int) (profilingTimestamp >>> 
32)) ^ ((int) (profilingTimestamp)) ^
-                               super.hashCode();
-       }
-}

Reply via email to