[FLINK-1969] [runtime] Remove deprecated profiler code
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fbea2da2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fbea2da2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fbea2da2 Branch: refs/heads/master Commit: fbea2da26d01c470687a5ad217a5fd6ad1de89e4 Parents: 59faf46 Author: Stephan Ewen <se...@apache.org> Authored: Sun May 3 14:10:35 2015 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Mon May 11 21:13:44 2015 +0200 ---------------------------------------------------------------------- .../runtime/profiling/JobManagerProfiler.java | 75 ---- .../runtime/profiling/ProfilingException.java | 42 -- .../runtime/profiling/ProfilingListener.java | 27 -- .../flink/runtime/profiling/ProfilingUtils.java | 180 --------- .../runtime/profiling/TaskManagerProfiler.java | 56 --- .../profiling/impl/EnvironmentThreadSet.java | 217 ---------- .../profiling/impl/InstanceProfiler.java | 342 ---------------- .../profiling/impl/JobProfilingData.java | 158 -------- .../InternalExecutionVertexProfilingData.java | 90 ----- ...ernalExecutionVertexThreadProfilingData.java | 96 ----- .../types/InternalInstanceProfilingData.java | 244 ------------ .../impl/types/InternalProfilingData.java | 23 -- .../impl/types/ProfilingDataContainer.java | 101 ----- .../profiling/types/InstanceProfilingEvent.java | 399 ------------------- .../types/InstanceSummaryProfilingEvent.java | 89 ----- .../runtime/profiling/types/ProfilingEvent.java | 135 ------- .../types/SingleInstanceProfilingEvent.java | 147 ------- .../profiling/types/ThreadProfilingEvent.java | 150 ------- .../profiling/types/VertexProfilingEvent.java | 142 ------- .../flink/runtime/jobmanager/JobManager.scala | 26 +- .../runtime/jobmanager/JobManagerProfiler.scala | 55 --- .../messages/JobManagerProfilerMessages.scala | 34 -- .../messages/TaskManagerProfilerMessages.scala | 55 --- .../taskmanager/TaskManagerProfiler.scala | 182 --------- .../profiling/impl/InstanceProfilerTest.java | 182 --------- .../profiling/types/ProfilingTypesTest.java | 177 -------- .../runtime/testingUtils/TestingCluster.scala | 4 +- .../test/util/ForkableFlinkMiniCluster.scala | 4 +- .../apache/flink/yarn/ApplicationMaster.scala | 10 +- 29 files changed, 12 insertions(+), 3430 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/JobManagerProfiler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/JobManagerProfiler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/JobManagerProfiler.java deleted file mode 100644 index 4c4a20e..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/JobManagerProfiler.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.profiling; - -import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.api.common.JobID; - -/** - * This interface must be implemented by profiling components - * for the job manager. - * - */ -public interface JobManagerProfiler { - - /** - * Registers the given {@link ExecutionGraph} for profiling. - * - * @param executionGraph - * the {@link ExecutionGraph} to register for profiling - */ - void registerProfilingJob(ExecutionGraph executionGraph); - - /** - * Unregisters the given {@link ExecutionGraph} from profiling. Calling this - * method will also unregister all of the job's registered listeners. - * - * @param executionGraph the {@link ExecutionGraph} to unregister. - */ - void unregisterProfilingJob(ExecutionGraph executionGraph); - - /** - * Registers the given {@link ProfilingListener} object to receive - * profiling data for the job with the given job ID. - * - * @param jobID - * the ID of the job to receive profiling data for - * @param profilingListener - * the {@link ProfilingListener} object to register - */ - void registerForProfilingData(JobID jobID, ProfilingListener profilingListener); - - /** - * Unregisters the given {@link ProfilingListener} object from receiving - * profiling data issued by the job manager's profiling component. - * - * @param jobID - * the ID of the job the {@link ProfilingListener} object has originally been registered for - * @param profilingListener - * the {@link ProfilingListener} object to unregister - */ - void unregisterFromProfilingData(JobID jobID, ProfilingListener profilingListener); - - /** - * Shuts done the job manager's profiling component - * and stops all its internal processes. - */ - void shutdown(); -} http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/ProfilingException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/ProfilingException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/ProfilingException.java deleted file mode 100644 index f3148d9..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/ProfilingException.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.profiling; - -/** - * A profiling exception is thrown if an error occur during profiling execution. - * - */ -public class ProfilingException extends Exception { - - /** - * Generated serialVersionUID. - */ - private static final long serialVersionUID = -3282996556813630561L; - - /** - * Constructs a new profiling exception with the given error message. - * - * @param errorMsg - * The error message to be included in the exception. - */ - public ProfilingException(String errorMsg) { - super(errorMsg); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/ProfilingListener.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/ProfilingListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/ProfilingListener.java deleted file mode 100644 index b3116ed..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/ProfilingListener.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.profiling; - -import org.apache.flink.runtime.profiling.types.ProfilingEvent; - -public interface ProfilingListener { - - void processProfilingEvents(ProfilingEvent profilingEvent); -} http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/ProfilingUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/ProfilingUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/ProfilingUtils.java deleted file mode 100644 index 95211be..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/ProfilingUtils.java +++ /dev/null @@ -1,180 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.profiling; - -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.net.InetAddress; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.flink.runtime.instance.InstanceConnectionInfo; -import org.apache.flink.util.StringUtils; - -/** - * This class contains utility functions to load and configure Nephele's - * profiling component. - * - */ -public class ProfilingUtils { - - /** - * The logging instance used to report problems. - */ - private static final Logger LOG = LoggerFactory.getLogger(ProfilingUtils.class); - - /** - * The key to check the job manager's profiling component should be enabled. - */ - public static final String ENABLE_PROFILING_KEY = "jobmanager.profiling.enable"; - - /** - * The class name of the job manager's profiling component to load if progiling is enabled. - */ - public static final String JOBMANAGER_CLASSNAME_KEY = "jobmanager.profiling.classname"; - - /** - * The class name of the task manager's profiling component to load if profiling is enabled. - */ - public static final String TASKMANAGER_CLASSNAME_KEY = "taskmanager.profiling.classname"; - - /** - * The key to check whether job profiling should be enabled for a specific job. - */ - public static final String PROFILE_JOB_KEY = "job.profiling.enable"; - - /** - * The key to check the port of the job manager's profiling RPC server. - */ - public static final String JOBMANAGER_RPC_PORT_KEY = "jobmanager.profiling.rpc.port"; - - /** - * The default network port the job manager's profiling component starts its RPC server on. - */ - public static final int JOBMANAGER_DEFAULT_RPC_PORT = 6124; - - /** - * Key to interval in which a task manager is supposed to send profiling data to the job manager. - */ - public static final String TASKMANAGER_REPORTINTERVAL_KEY = "taskmanager.profiling.reportinterval"; - - /** - * Default interval which the task manager uses to report profiling data to the job manager. - */ - public static final int DEFAULT_TASKMANAGER_REPORTINTERVAL = 2; - - /** - * Creates an instance of the job manager's profiling component. - * - * @param profilerClassName - * the class name of the profiling component to load - * @param jobManagerBindAddress - * the address the job manager's RPC server is bound to - * @return an instance of the job manager profiling component or <code>null</code> if an error occurs - */ - @SuppressWarnings("unchecked") - public static JobManagerProfiler loadJobManagerProfiler(String profilerClassName, InetAddress jobManagerBindAddress) { - - final Class<? extends JobManagerProfiler> profilerClass; - try { - profilerClass = (Class<? extends JobManagerProfiler>) Class.forName(profilerClassName); - } catch (ClassNotFoundException e) { - LOG.error("Cannot find class " + profilerClassName + ": " + StringUtils.stringifyException(e)); - return null; - } - - JobManagerProfiler profiler = null; - - try { - - final Constructor<JobManagerProfiler> constr = (Constructor<JobManagerProfiler>) profilerClass.getConstructor(InetAddress.class); - profiler = constr.newInstance(jobManagerBindAddress); - - } catch(InvocationTargetException e) { - LOG.error("Cannot create profiler: " + StringUtils.stringifyException(e)); - return null; - } catch (NoSuchMethodException e) { - LOG.error("Cannot create profiler: " + StringUtils.stringifyException(e)); - return null; - } catch (InstantiationException e) { - LOG.error("Cannot create profiler: " + StringUtils.stringifyException(e)); - return null; - } catch (IllegalAccessException e) { - LOG.error("Cannot create profiler: " + StringUtils.stringifyException(e)); - return null; - } catch (IllegalArgumentException e) { - LOG.error("Cannot create profiler: " + StringUtils.stringifyException(e)); - return null; - } - - return profiler; - } - - /** - * Creates an instance of the task manager's profiling component. - * - * @param profilerClassName - * the class name of the profiling component to load - * @return an instance of the task manager profiling component or <code>null</code> if an error occurs - */ - @SuppressWarnings("unchecked") - public static TaskManagerProfiler loadTaskManagerProfiler(String profilerClassName, InetAddress jobManagerAddress, - InstanceConnectionInfo instanceConnectionInfo) { - - final Class<? extends TaskManagerProfiler> profilerClass; - try { - profilerClass = (Class<? extends TaskManagerProfiler>) Class.forName(profilerClassName); - } catch (ClassNotFoundException e) { - LOG.error("Cannot find class " + profilerClassName + ": " + StringUtils.stringifyException(e)); - return null; - } - - Constructor<? extends TaskManagerProfiler> constructor = null; - try { - constructor = profilerClass.getConstructor(InetAddress.class, InstanceConnectionInfo.class); - } catch (SecurityException e1) { - LOG.error("Security exception while retrieving constructor for class " + profilerClass.getCanonicalName() - + ".", e1); - return null; - } catch (NoSuchMethodException e1) { - LOG.error("Class " + profilerClass.getCanonicalName() + " does not have a constructor taking a " + - "InetAddress and InstanceConnectionInfo parameter.", e1); - return null; - } - - TaskManagerProfiler profiler = null; - try { - profiler = constructor.newInstance(jobManagerAddress, instanceConnectionInfo); - } catch (IllegalArgumentException e) { - LOG.error("IllegalArgumentException while creating object of class " + profilerClass.getCanonicalName() + - ".", e); - } catch (InstantiationException e) { - LOG.error("Could not instantiate object of class " + profilerClass.getCanonicalName() + ".",e); - } catch (IllegalAccessException e) { - LOG.error("IllegalAccessException while creating object of class " + profilerClass.getCanonicalName() + ".", - e); - } catch (InvocationTargetException e) { - LOG.error("InvocationTargetException while creating object of class " + profilerClass.getCanonicalName() + - ".", e); - } - - return profiler; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/TaskManagerProfiler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/TaskManagerProfiler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/TaskManagerProfiler.java deleted file mode 100644 index 709a05c..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/TaskManagerProfiler.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.profiling; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.taskmanager.Task; - -/** - * This interface must be implemented by profiling components - * for the task manager manager. - */ -public interface TaskManagerProfiler { - - /** - * Registers a {@link org.apache.flink.runtime.taskmanager.Task} object for profiling. - * - * @param task - * task to be register a profiling listener for - * @param jobConfiguration - * the job configuration sent with the task - */ - void registerTask(Task task, Configuration jobConfiguration); - - /** - * Unregisters all previously registered {@link org.apache.flink.runtime.taskmanager.Task} - * objects for the vertex identified by the given ID. - * - * @param id - * the ID of the vertex to unregister the - * {@link org.apache.flink.runtime.taskmanager.Task} objects for - */ - void unregisterTask(ExecutionAttemptID id); - - /** - * Shuts done the task manager's profiling component - * and stops all its internal processes. - */ - void shutdown(); -} http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/EnvironmentThreadSet.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/EnvironmentThreadSet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/EnvironmentThreadSet.java deleted file mode 100644 index d5ce137..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/EnvironmentThreadSet.java +++ /dev/null @@ -1,217 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.profiling.impl; - -import java.lang.management.ThreadInfo; -import java.lang.management.ThreadMXBean; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; - -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.profiling.impl.types.InternalExecutionVertexThreadProfilingData; - -public class EnvironmentThreadSet { - - private static final long NANO_TO_MILLISECONDS = 1000 * 1000; - - private static final long PERCENT = 100; - - - - private final Thread mainThread; - - private final JobVertexID vertexId; - - private final int subtask; - - private final ExecutionAttemptID executionId; - - private final Map<Thread, CPUUtilizationSnapshot> userThreads = new HashMap<Thread, CPUUtilizationSnapshot>(); - - private CPUUtilizationSnapshot mainThreadSnapshot; - - - public EnvironmentThreadSet(ThreadMXBean tmx, Thread mainThread, JobVertexID vertexId, int subtask, ExecutionAttemptID executionId) { - this.mainThread = mainThread; - this.vertexId = vertexId; - this.subtask = subtask; - this.executionId = executionId; - - this.mainThreadSnapshot = createCPUUtilizationSnapshot(tmx, mainThread, System.currentTimeMillis()); - } - - public Thread getMainThread() { - return this.mainThread; - } - - public void addUserThread(ThreadMXBean tmx, Thread thread) { - synchronized (this.userThreads) { - this.userThreads.put(thread, createCPUUtilizationSnapshot(tmx, thread, System.currentTimeMillis())); - } - } - - public void removeUserThread(Thread thread) { - synchronized (this.userThreads) { - this.userThreads.remove(thread); - } - } - - public int getNumberOfUserThreads() { - synchronized (this.userThreads) { - return this.userThreads.size(); - } - } - - private CPUUtilizationSnapshot createCPUUtilizationSnapshot(ThreadMXBean tmx, Thread thread, long timestamp) { - final long threadId = thread.getId(); - - final ThreadInfo threadInfo = tmx.getThreadInfo(threadId); - if(threadInfo == null) { - return null; - } - - return new CPUUtilizationSnapshot(timestamp, - tmx.getThreadCpuTime(threadId) / NANO_TO_MILLISECONDS, - tmx.getThreadUserTime(threadId) / NANO_TO_MILLISECONDS, - threadInfo.getWaitedTime(), - threadInfo.getBlockedTime()); - } - - public InternalExecutionVertexThreadProfilingData captureCPUUtilization(JobID jobID, ThreadMXBean tmx, long timestamp) { - - synchronized (this.userThreads) { - - // Calculate utilization for main thread first - final CPUUtilizationSnapshot newMainThreadSnapshot = createCPUUtilizationSnapshot(tmx, this.mainThread, timestamp); - if(newMainThreadSnapshot == null) { - return null; - } - - final long mainInterval = newMainThreadSnapshot.getTimestamp() - this.mainThreadSnapshot.getTimestamp(); - - if (mainInterval == 0) { - return null; - } - - long cputime = newMainThreadSnapshot.getTotalCPUTime() - this.mainThreadSnapshot.getTotalCPUTime(); - long usrtime = newMainThreadSnapshot.getTotalCPUUserTime() - this.mainThreadSnapshot.getTotalCPUUserTime(); - long systime = cputime - usrtime; - long waitime = newMainThreadSnapshot.getTotalCPUWaitTime() - this.mainThreadSnapshot.getTotalCPUWaitTime(); - long blktime = newMainThreadSnapshot.getTotalCPUBlockTime() - - this.mainThreadSnapshot.getTotalCPUBlockTime(); - - int sumUsrTime = (int) ((usrtime * PERCENT) / mainInterval); - int sumSysTime = (int) ((systime * PERCENT) / mainInterval); - int sumBlkTime = (int) ((blktime * PERCENT) / mainInterval); - int sumWaiTime = (int) ((waitime * PERCENT) / mainInterval); - - // Update snapshot - this.mainThreadSnapshot = newMainThreadSnapshot; - - if (!this.userThreads.isEmpty()) { - - final Iterator<Thread> it = this.userThreads.keySet().iterator(); - int divisor = this.userThreads.size(); - while (it.hasNext()) { - - final Thread userThread = it.next(); - final CPUUtilizationSnapshot newUtilizationSnaphot = createCPUUtilizationSnapshot(tmx, userThread, - timestamp); - final CPUUtilizationSnapshot oldUtilizationSnapshot = this.userThreads.get(userThread); - - long interval = newUtilizationSnaphot.getTimestamp() - oldUtilizationSnapshot.getTimestamp(); - - if (interval == 0) { - --divisor; - continue; - } - - cputime = newUtilizationSnaphot.getTotalCPUTime() - oldUtilizationSnapshot.getTotalCPUTime(); - usrtime = newUtilizationSnaphot.getTotalCPUUserTime() - oldUtilizationSnapshot.getTotalCPUUserTime(); - systime = cputime - usrtime; - waitime = newUtilizationSnaphot.getTotalCPUWaitTime() - oldUtilizationSnapshot.getTotalCPUWaitTime(); - blktime = newUtilizationSnaphot.getTotalCPUBlockTime() - oldUtilizationSnapshot.getTotalCPUBlockTime(); - - sumUsrTime += (int) ((usrtime * PERCENT) / interval); - sumSysTime += (int) ((systime * PERCENT) / interval); - sumBlkTime += (int) ((blktime * PERCENT) / interval); - sumWaiTime += (int) ((waitime * PERCENT) / interval); - - // Update snapshot - this.userThreads.put(userThread, newUtilizationSnaphot); - } - - sumUsrTime /= (divisor + 1); - sumSysTime /= (divisor + 1); - sumBlkTime /= (divisor + 1); - sumWaiTime /= (divisor + 1); - } - - return new InternalExecutionVertexThreadProfilingData(jobID, this.vertexId, this.subtask, this.executionId, - (int) mainInterval, sumUsrTime, sumSysTime, sumBlkTime, sumWaiTime); - } - } - - // -------------------------------------------------------------------------------------------- - - private class CPUUtilizationSnapshot { - - private final long timestamp; - - private final long totalCPUTime; - - private final long totalCPUUserTime; - - private final long totalCPUWaitTime; - - private final long totalCPUBlockTime; - - public CPUUtilizationSnapshot(long timestamp, long totalCPUTime, long totalCPUUserTime, long totalCPUWaitTime, - long totalCPUBlockTime) { - this.timestamp = timestamp; - this.totalCPUTime = totalCPUTime; - this.totalCPUUserTime = totalCPUUserTime; - this.totalCPUWaitTime = totalCPUWaitTime; - this.totalCPUBlockTime = totalCPUBlockTime; - } - - public long getTimestamp() { - return this.timestamp; - } - - public long getTotalCPUTime() { - return this.totalCPUTime; - } - - public long getTotalCPUUserTime() { - return this.totalCPUUserTime; - } - - public long getTotalCPUWaitTime() { - return this.totalCPUWaitTime; - } - - public long getTotalCPUBlockTime() { - return this.totalCPUBlockTime; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/InstanceProfiler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/InstanceProfiler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/InstanceProfiler.java deleted file mode 100644 index d973017..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/InstanceProfiler.java +++ /dev/null @@ -1,342 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.profiling.impl; - -import java.io.BufferedReader; -import java.io.FileReader; -import java.io.IOException; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.flink.runtime.profiling.ProfilingException; -import org.apache.flink.runtime.profiling.impl.types.InternalInstanceProfilingData; -import org.apache.flink.util.StringUtils; - -public class InstanceProfiler { - - static final String PROC_MEMINFO = "/proc/meminfo"; - - static final String PROC_STAT = "/proc/stat"; - - static final String PROC_NET_DEV = "/proc/net/dev"; - - private static final String LOOPBACK_INTERFACE_NAME = "lo"; - - private static final Pattern CPU_PATTERN = Pattern - .compile("^cpu\\s+(\\d+)\\s+(\\d+)\\s+(\\d+)\\s+(\\d+)\\s+(\\d+)\\s+(\\d+)\\s+(\\d+).+$"); - - private static final Pattern NETWORK_PATTERN = Pattern - .compile("^\\s*(\\w+):\\s*(\\d+)\\s+\\d+\\s+\\d+\\s+\\d+\\s+\\d+\\s+\\d+\\s+\\d+\\s+\\d+\\s+(\\d+).+$"); - - private static final Pattern MEMORY_PATTERN = Pattern.compile("^\\w+:\\s*(\\d+)\\s+kB$"); - - private static final int PERCENT = 100; - - private final String instancePath; - - private long lastTimestamp = 0; - - // CPU related variables - private long lastCpuUser = 0; - - private long lastCpuNice = 0; - - private long lastCpuSys = 0; - - private long lastCpuIdle = 0; - - private long lastCpuIOWait = 0; - - private long lastCpuIrq = 0; - - private long lastCpuSoftirq = 0; - - // Network related variables - private long lastReceivedBytes = 0; - - private long lastTramsmittedBytes = 0; - - private long firstTimestamp; - - public InstanceProfiler(String instancePath) - throws ProfilingException { - - this.instancePath = instancePath; - this.firstTimestamp = System.currentTimeMillis(); - // Initialize counters by calling generateProfilingData once and ignore the return value - generateProfilingData(this.firstTimestamp); - } - - public InternalInstanceProfilingData generateProfilingData(long timestamp) throws ProfilingException { - - final long profilingInterval = timestamp - lastTimestamp; - - final InternalInstanceProfilingData profilingData = new InternalInstanceProfilingData( - this.instancePath, (int) profilingInterval); - - updateCPUUtilization(profilingData); - updateMemoryUtilization(profilingData); - updateNetworkUtilization(profilingData); - - // Update timestamp - this.lastTimestamp = timestamp; - - return profilingData; - } - - private void updateMemoryUtilization(InternalInstanceProfilingData profilingData) throws ProfilingException { - - BufferedReader in = null; - - try { - - in = new BufferedReader(new FileReader(PROC_MEMINFO)); - - long freeMemory = 0; - long totalMemory = 0; - long bufferedMemory = 0; - long cachedMemory = 0; - long cachedSwapMemory = 0; - - int count = 0; - String output; - while ((output = in.readLine()) != null) { - - switch (count) { - case 0: // Total memory - totalMemory = extractMemoryValue(output); - break; - case 1: // Free memory - freeMemory = extractMemoryValue(output); - break; - case 2: // Buffers - bufferedMemory = extractMemoryValue(output); - break; - case 3: // Cache - cachedMemory = extractMemoryValue(output); - break; - case 4: - cachedSwapMemory = extractMemoryValue(output); - break; - default: - break; - } - - ++count; - } - - profilingData.setTotalMemory(totalMemory); - profilingData.setFreeMemory(freeMemory); - profilingData.setBufferedMemory(bufferedMemory); - profilingData.setCachedMemory(cachedMemory); - profilingData.setCachedSwapMemory(cachedSwapMemory); - - } catch (IOException ioe) { - throw new ProfilingException("Error while reading network utilization: " - + StringUtils.stringifyException(ioe)); - } finally { - if (in != null) { - try { - in.close(); - } catch (IOException e) { - } - } - } - - } - - private long extractMemoryValue(String line) throws ProfilingException { - - final Matcher matcher = MEMORY_PATTERN.matcher(line); - if (!matcher.matches()) { - throw new ProfilingException("Cannot extract memory data for profiling from line " + line); - } - - return Long.parseLong(matcher.group(1)); - } - - private void updateNetworkUtilization(InternalInstanceProfilingData profilingData) throws ProfilingException { - - BufferedReader in = null; - - try { - - in = new BufferedReader(new FileReader(PROC_NET_DEV)); - - long receivedSum = 0; - long transmittedSum = 0; - - String output; - while ((output = in.readLine()) != null) { - final Matcher networkMatcher = NETWORK_PATTERN.matcher(output); - if (!networkMatcher.matches()) { - continue; - } - /* - * Extract information according to - * http://linuxdevcenter.com/pub/a/linux/2000/11/16/LinuxAdmin.html - */ - - if (LOOPBACK_INTERFACE_NAME.equals(networkMatcher.group(1))) { - continue; - } - - receivedSum += Long.parseLong(networkMatcher.group(2)); - transmittedSum += Long.parseLong(networkMatcher.group(3)); - } - - in.close(); - in = null; - - profilingData.setReceivedBytes(receivedSum - this.lastReceivedBytes); - profilingData.setTransmittedBytes(transmittedSum - this.lastTramsmittedBytes); - - // Store values for next call - this.lastReceivedBytes = receivedSum; - this.lastTramsmittedBytes = transmittedSum; - - } catch (IOException ioe) { - throw new ProfilingException("Error while reading network utilization: " - + StringUtils.stringifyException(ioe)); - } catch (NumberFormatException nfe) { - throw new ProfilingException("Error while reading network utilization: " - + StringUtils.stringifyException(nfe)); - } finally { - if (in != null) { - try { - in.close(); - } catch (IOException e) { - } - } - } - } - - private void updateCPUUtilization(InternalInstanceProfilingData profilingData) throws ProfilingException { - - BufferedReader in = null; - - try { - - in = new BufferedReader(new FileReader(PROC_STAT)); - final String output = in.readLine(); - if (output == null) { - throw new ProfilingException("Cannot read CPU utilization, return value is null"); - } - - in.close(); - in = null; - - final Matcher cpuMatcher = CPU_PATTERN.matcher(output); - if (!cpuMatcher.matches()) { - throw new ProfilingException("Cannot extract CPU utilization from output \"" + output + "\""); - } - - /* - * Extract the information from the read line according to - * http://www.linuxhowtos.org/System/procstat.htm - */ - - final long cpuUser = Long.parseLong(cpuMatcher.group(1)); - final long cpuNice = Long.parseLong(cpuMatcher.group(2)); - final long cpuSys = Long.parseLong(cpuMatcher.group(3)); - final long cpuIdle = Long.parseLong(cpuMatcher.group(4)); - final long cpuIOWait = Long.parseLong(cpuMatcher.group(5)); - final long cpuIrq = Long.parseLong(cpuMatcher.group(6)); - final long cpuSoftirq = Long.parseLong(cpuMatcher.group(7)); - - // Calculate deltas - final long deltaCpuUser = cpuUser - this.lastCpuUser; - final long deltaCpuNice = cpuNice - this.lastCpuNice; - final long deltaCpuSys = cpuSys - this.lastCpuSys; - final long deltaCpuIdle = cpuIdle - this.lastCpuIdle; - final long deltaCpuIOWait = cpuIOWait - this.lastCpuIOWait; - final long deltaCpuIrq = cpuIrq - this.lastCpuIrq; - final long deltaCpuSoftirq = cpuSoftirq - this.lastCpuSoftirq; - final long deltaSum = deltaCpuUser + deltaCpuNice + deltaCpuSys + deltaCpuIdle + deltaCpuIOWait - + deltaCpuIrq + deltaCpuSoftirq; - - // TODO: Fix deltaSum = 0 situation - - // Set the percentage values for the profiling data object - /* - * profilingData.setIdleCPU((int)((deltaCpuIdle*PERCENT)/deltaSum)); - * profilingData.setUserCPU((int)((deltaCpuUser*PERCENT)/deltaSum)); - * profilingData.setSystemCPU((int)((deltaCpuSys*PERCENT)/deltaSum)); - * profilingData.setIoWaitCPU((int)((deltaCpuIOWait*PERCENT)/deltaSum)); - * profilingData.setHardIrqCPU((int)((deltaCpuIrq*PERCENT)/deltaSum)); - * profilingData.setSoftIrqCPU((int)((deltaCpuSoftirq*PERCENT)/deltaSum)); - */ - // TODO: bad quick fix - if (deltaSum > 0) { - profilingData.setIdleCPU((int) ((deltaCpuIdle * PERCENT) / deltaSum)); - profilingData.setUserCPU((int) ((deltaCpuUser * PERCENT) / deltaSum)); - profilingData.setSystemCPU((int) ((deltaCpuSys * PERCENT) / deltaSum)); - profilingData.setIoWaitCPU((int) ((deltaCpuIOWait * PERCENT) / deltaSum)); - profilingData.setHardIrqCPU((int) ((deltaCpuIrq * PERCENT) / deltaSum)); - profilingData.setSoftIrqCPU((int) ((deltaCpuSoftirq * PERCENT) / deltaSum)); - } else { - profilingData.setIdleCPU((int) (deltaCpuIdle)); - profilingData.setUserCPU((int) (deltaCpuUser)); - profilingData.setSystemCPU((int) (deltaCpuSys)); - profilingData.setIoWaitCPU((int) (deltaCpuIOWait)); - profilingData.setHardIrqCPU((int) (deltaCpuIrq)); - profilingData.setSoftIrqCPU((int) (deltaCpuSoftirq)); - } - // Store values for next call - this.lastCpuUser = cpuUser; - this.lastCpuNice = cpuNice; - this.lastCpuSys = cpuSys; - this.lastCpuIdle = cpuIdle; - this.lastCpuIOWait = cpuIOWait; - this.lastCpuIrq = cpuIrq; - this.lastCpuSoftirq = cpuSoftirq; - - } catch (IOException ioe) { - throw new ProfilingException("Error while reading CPU utilization: " + StringUtils.stringifyException(ioe)); - } catch (NumberFormatException nfe) { - throw new ProfilingException("Error while reading CPU utilization: " + StringUtils.stringifyException(nfe)); - } finally { - if (in != null) { - try { - in.close(); - } catch (IOException e) { - } - } - } - - } - - /** - * @return InternalInstanceProfilingData ProfilingData for the instance from execution-start to currentTime - * @throws ProfilingException - */ - public InternalInstanceProfilingData generateCheckpointProfilingData() throws ProfilingException { - final long profilingInterval = System.currentTimeMillis() - this.firstTimestamp; - - final InternalInstanceProfilingData profilingData = new InternalInstanceProfilingData( - this.instancePath, (int) profilingInterval); - - updateCPUUtilization(profilingData); - updateMemoryUtilization(profilingData); - updateNetworkUtilization(profilingData); - - return profilingData; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/JobProfilingData.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/JobProfilingData.java b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/JobProfilingData.java deleted file mode 100644 index ce7db25..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/JobProfilingData.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.profiling.impl; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; - -import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.executiongraph.ExecutionVertex; -import org.apache.flink.runtime.instance.Instance; -import org.apache.flink.runtime.instance.SimpleSlot; -import org.apache.flink.runtime.profiling.impl.types.InternalInstanceProfilingData; -import org.apache.flink.runtime.profiling.types.InstanceSummaryProfilingEvent; - -public class JobProfilingData { - - private final ExecutionGraph executionGraph; - - private final long profilingStart; - - private final Map<String, InternalInstanceProfilingData> collectedInstanceProfilingData = new - HashMap<String, InternalInstanceProfilingData>(); - - - public JobProfilingData(ExecutionGraph executionGraph) { - this.executionGraph = executionGraph; - this.profilingStart = System.currentTimeMillis(); - } - - - public long getProfilingStart() { - return this.profilingStart; - } - - public ExecutionGraph getExecutionGraph() { - return this.executionGraph; - } - - public boolean addIfInstanceIsAllocatedByJob(InternalInstanceProfilingData instanceProfilingData) { - - for (ExecutionVertex executionVertex : this.executionGraph.getAllExecutionVertices()) { - SimpleSlot slot = executionVertex.getCurrentAssignedResource(); - if (slot != null && slot.getInstance().getPath().equals( - instanceProfilingData.getInstancePath())) - { - this.collectedInstanceProfilingData.put(instanceProfilingData.getInstancePath(), instanceProfilingData); - return true; - } - } - - return false; - } - - public InstanceSummaryProfilingEvent getInstanceSummaryProfilingData(long timestamp) { - - final Set<Instance> tempSet = new HashSet<Instance>(); - - for (ExecutionVertex executionVertex : this.executionGraph.getAllExecutionVertices()) { - SimpleSlot slot = executionVertex.getCurrentAssignedResource(); - if (slot != null) { - tempSet.add(slot.getInstance()); - } - } - - // Now compare the size of the collected data set and the allocated instance set. - // If their sizes are equal we can issue an instance summary. - if (tempSet.size() != this.collectedInstanceProfilingData.size()) { - return null; - } - - return constructInstanceSummary(timestamp); - } - - private InstanceSummaryProfilingEvent constructInstanceSummary(long timestamp) { - - final int numberOfInstances = this.collectedInstanceProfilingData.size(); - - final Iterator<String> instanceIterator = this.collectedInstanceProfilingData.keySet().iterator(); - - long freeMemorySum = 0; - long totalMemorySum = 0; - long bufferedMemorySum = 0; - long cachedMemorySum = 0; - long cachedSwapMemorySum = 0; - - int ioWaitCPUSum = 0; - int idleCPUSum = 0; - int profilingIntervalSum = 0; - int systemCPUSum = 0; - int hardIrqCPUSum = 0; - int softIrqCPUSum = 0; - - int userCPUSum = 0; - long receivedBytesSum = 0; - long transmittedBytesSum = 0; - - // Sum up the individual values - while (instanceIterator.hasNext()) { - - final InternalInstanceProfilingData profilingData = this.collectedInstanceProfilingData.get(instanceIterator.next()); - - freeMemorySum += profilingData.getFreeMemory(); - ioWaitCPUSum += profilingData.getIOWaitCPU(); - idleCPUSum += profilingData.getIdleCPU(); - profilingIntervalSum += profilingData.getProfilingInterval(); - systemCPUSum += profilingData.getSystemCPU(); - hardIrqCPUSum += profilingData.getHardIrqCPU(); - softIrqCPUSum += profilingData.getSoftIrqCPU(); - totalMemorySum += profilingData.getTotalMemory(); - userCPUSum += profilingData.getUserCPU(); - receivedBytesSum += profilingData.getReceivedBytes(); - transmittedBytesSum += profilingData.getTransmittedBytes(); - bufferedMemorySum += profilingData.getBufferedMemory(); - cachedMemorySum += profilingData.getCachedMemory(); - cachedSwapMemorySum += profilingData.getCachedSwapMemory(); - } - - final InstanceSummaryProfilingEvent instanceSummary = new InstanceSummaryProfilingEvent( - profilingIntervalSum / numberOfInstances, - ioWaitCPUSum / numberOfInstances, - idleCPUSum / numberOfInstances, - userCPUSum / numberOfInstances, - systemCPUSum / numberOfInstances, - hardIrqCPUSum / numberOfInstances, - softIrqCPUSum / numberOfInstances, - totalMemorySum / numberOfInstances, - freeMemorySum / numberOfInstances, - bufferedMemorySum / numberOfInstances, - cachedMemorySum / numberOfInstances, - cachedSwapMemorySum / numberOfInstances, - receivedBytesSum / numberOfInstances, - transmittedBytesSum / numberOfInstances, - this.executionGraph.getJobID(), timestamp, (timestamp - this.profilingStart)); - - this.collectedInstanceProfilingData.clear(); - - return instanceSummary; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalExecutionVertexProfilingData.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalExecutionVertexProfilingData.java b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalExecutionVertexProfilingData.java deleted file mode 100644 index 98ccfcf..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalExecutionVertexProfilingData.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.profiling.impl.types; - -import java.io.IOException; - -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.jobgraph.JobVertexID; - -public abstract class InternalExecutionVertexProfilingData implements InternalProfilingData { - - private final JobID jobId; - - private final JobVertexID vertexId; - - private int subtask; - - private final ExecutionAttemptID executionId; - - - public InternalExecutionVertexProfilingData() { - this.jobId = new JobID(); - this.vertexId = new JobVertexID(); - this.executionId = new ExecutionAttemptID(); - this.subtask = -1; - } - - - public InternalExecutionVertexProfilingData(JobID jobId, JobVertexID vertexId, int subtask, ExecutionAttemptID executionId) { - this.jobId = jobId; - this.vertexId = vertexId; - this.subtask = subtask; - this.executionId = executionId; - } - - // -------------------------------------------------------------------------------------------- - - public JobID getJobID() { - return this.jobId; - } - - public JobVertexID getVertexId() { - return vertexId; - } - - public int getSubtask() { - return subtask; - } - - public ExecutionAttemptID getExecutionAttemptId() { - return this.executionId; - } - - // -------------------------------------------------------------------------------------------- - - @Override - public void read(DataInputView in) throws IOException { - this.jobId.read(in); - this.vertexId.read(in); - this.executionId.read(in); - this.subtask = in.readInt(); - } - - @Override - public void write(DataOutputView out) throws IOException { - this.jobId.write(out); - this.vertexId.write(out); - this.executionId.write(out); - out.writeInt(this.subtask); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalExecutionVertexThreadProfilingData.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalExecutionVertexThreadProfilingData.java b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalExecutionVertexThreadProfilingData.java deleted file mode 100644 index 92ce916..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalExecutionVertexThreadProfilingData.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.profiling.impl.types; - -import java.io.IOException; - -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.jobgraph.JobVertexID; - -public class InternalExecutionVertexThreadProfilingData extends InternalExecutionVertexProfilingData { - - private int profilingInterval = 0; - - private int userTime = 0; - - private int systemTime = 0; - - private int blockedTime = 0; - - private int waitedTime = 0; - - public InternalExecutionVertexThreadProfilingData(JobID jobID, JobVertexID vertexId, int subtask, ExecutionAttemptID executionId, - int profilingInterval, int userTime, int systemTime, int blockedTime, int waitedTime) - { - super(jobID, vertexId, subtask, executionId); - - this.profilingInterval = profilingInterval; - this.userTime = userTime; - this.systemTime = systemTime; - this.blockedTime = blockedTime; - this.waitedTime = waitedTime; - } - - public InternalExecutionVertexThreadProfilingData() {} - - @Override - public void read(DataInputView in) throws IOException { - super.read(in); - - this.profilingInterval = in.readInt(); - this.userTime = in.readInt(); - this.systemTime = in.readInt(); - this.blockedTime = in.readInt(); - this.waitedTime = in.readInt(); - } - - @Override - public void write(DataOutputView out) throws IOException { - super.write(out); - - out.writeInt(this.profilingInterval); - out.writeInt(this.userTime); - out.writeInt(this.systemTime); - out.writeInt(this.blockedTime); - out.writeInt(this.waitedTime); - } - - public int getBlockedTime() { - return this.blockedTime; - } - - public int getProfilingInterval() { - return this.profilingInterval; - } - - public int getSystemTime() { - return this.systemTime; - } - - public int getUserTime() { - return this.userTime; - } - - public int getWaitedTime() { - return this.waitedTime; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalInstanceProfilingData.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalInstanceProfilingData.java b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalInstanceProfilingData.java deleted file mode 100644 index 145d11c..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalInstanceProfilingData.java +++ /dev/null @@ -1,244 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.profiling.impl.types; - -import java.io.IOException; - -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; - -public class InternalInstanceProfilingData implements InternalProfilingData { - - private String instancePath; - - private int profilingInterval; - - private int ioWaitCPU; - - private int idleCPU; - - private int userCPU; - - private int systemCPU; - - private int hardIrqCPU; - - private int softIrqCPU; - - private long totalMemory; - - private long freeMemory; - - private long bufferedMemory; - - private long cachedMemory; - - private long cachedSwapMemory; - - private long receivedBytes; - - private long transmittedBytes; - - public InternalInstanceProfilingData() { - this.freeMemory = -1; - this.ioWaitCPU = -1; - this.idleCPU = -1; - this.instancePath = ""; - this.profilingInterval = -1; - this.systemCPU = -1; - this.totalMemory = -1; - this.bufferedMemory = -1; - this.cachedMemory = -1; - this.cachedSwapMemory = -1; - this.userCPU = -1; - this.receivedBytes = -1; - this.transmittedBytes = -1; - } - - public InternalInstanceProfilingData(String instancePath, int profilingInterval) { - - this.instancePath = instancePath; - this.profilingInterval = profilingInterval; - this.freeMemory = -1; - this.ioWaitCPU = -1; - this.idleCPU = -1; - this.systemCPU = -1; - this.totalMemory = -1; - this.bufferedMemory = -1; - this.cachedMemory = -1; - this.cachedSwapMemory = -1; - this.userCPU = -1; - this.receivedBytes = -1; - this.transmittedBytes = -1; - } - - public long getFreeMemory() { - return this.freeMemory; - } - - public int getIOWaitCPU() { - return this.ioWaitCPU; - } - - public int getIdleCPU() { - return this.idleCPU; - } - - public int getHardIrqCPU() { - return this.hardIrqCPU; - } - - public int getSoftIrqCPU() { - return this.softIrqCPU; - } - - public String getInstancePath() { - return this.instancePath; - } - - public int getProfilingInterval() { - return this.profilingInterval; - } - - public int getSystemCPU() { - return this.systemCPU; - } - - public long getTotalMemory() { - return this.totalMemory; - } - - public long getBufferedMemory() { - return this.bufferedMemory; - } - - public long getCachedMemory() { - return this.cachedMemory; - } - - public long getCachedSwapMemory() { - return this.cachedSwapMemory; - } - - public int getUserCPU() { - return this.userCPU; - } - - public long getReceivedBytes() { - return this.receivedBytes; - } - - public long getTransmittedBytes() { - return this.transmittedBytes; - } - - @Override - public void read(DataInputView in) throws IOException { - - this.freeMemory = in.readLong(); - this.ioWaitCPU = in.readInt(); - this.idleCPU = in.readInt(); - this.instancePath = in.readUTF(); - this.profilingInterval = in.readInt(); - this.systemCPU = in.readInt(); - this.totalMemory = in.readLong(); - this.bufferedMemory = in.readLong(); - this.cachedMemory = in.readLong(); - this.cachedSwapMemory = in.readLong(); - this.userCPU = in.readInt(); - this.receivedBytes = in.readLong(); - this.transmittedBytes = in.readLong(); - this.hardIrqCPU = in.readInt(); - this.softIrqCPU = in.readInt(); - - } - - @Override - public void write(DataOutputView out) throws IOException { - - out.writeLong(this.freeMemory); - out.writeInt(this.ioWaitCPU); - out.writeInt(this.idleCPU); - out.writeUTF(instancePath); - out.writeInt(this.profilingInterval); - out.writeInt(this.systemCPU); - out.writeLong(this.totalMemory); - out.writeLong(this.bufferedMemory); - out.writeLong(this.cachedMemory); - out.writeLong(this.cachedSwapMemory); - out.writeInt(this.userCPU); - out.writeLong(this.receivedBytes); - out.writeLong(this.transmittedBytes); - out.writeInt(this.hardIrqCPU); - out.writeInt(this.softIrqCPU); - - } - - public void setFreeMemory(long freeMemory) { - this.freeMemory = freeMemory; - } - - public void setIoWaitCPU(int ioWaitCPU) { - this.ioWaitCPU = ioWaitCPU; - } - - public void setIdleCPU(int idleCPU) { - this.idleCPU = idleCPU; - } - - public void setSystemCPU(int systemCPU) { - this.systemCPU = systemCPU; - } - - public void setHardIrqCPU(int hardIrqCPU) { - this.hardIrqCPU = hardIrqCPU; - } - - public void setSoftIrqCPU(int softIrqCPU) { - this.softIrqCPU = softIrqCPU; - } - - public void setTotalMemory(long totalMemory) { - this.totalMemory = totalMemory; - } - - public void setBufferedMemory(long bufferedMemory) { - this.bufferedMemory = bufferedMemory; - } - - public void setCachedMemory(long cachedMemory) { - this.cachedMemory = cachedMemory; - } - - public void setCachedSwapMemory(long cachedSwapMemory) { - this.cachedSwapMemory = cachedSwapMemory; - } - - public void setUserCPU(int userCPU) { - this.userCPU = userCPU; - } - - public void setReceivedBytes(long receivedBytes) { - this.receivedBytes = receivedBytes; - } - - public void setTransmittedBytes(long transmittedBytes) { - this.transmittedBytes = transmittedBytes; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalProfilingData.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalProfilingData.java b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalProfilingData.java deleted file mode 100644 index c6f53d1..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalProfilingData.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.profiling.impl.types; - -import org.apache.flink.core.io.IOReadableWritable; - -public interface InternalProfilingData extends IOReadableWritable {} http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/ProfilingDataContainer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/ProfilingDataContainer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/ProfilingDataContainer.java deleted file mode 100644 index b233b84..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/ProfilingDataContainer.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.profiling.impl.types; - -import java.io.IOException; -import java.util.ArrayDeque; -import java.util.Iterator; -import java.util.Queue; - -import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.util.StringUtils; - -public class ProfilingDataContainer implements IOReadableWritable { - - private final Queue<InternalProfilingData> queuedProfilingData = new ArrayDeque<InternalProfilingData>(); - - public void addProfilingData(InternalProfilingData profilingData) { - - if (profilingData == null) { - return; - } - - this.queuedProfilingData.add(profilingData); - } - - public void clear() { - this.queuedProfilingData.clear(); - } - - @SuppressWarnings("unchecked") - @Override - public void read(DataInputView in) throws IOException { - - final int numberOfRecords = in.readInt(); - for (int i = 0; i < numberOfRecords; i++) { - final String className = StringUtils.readNullableString(in); - - Class<? extends InternalProfilingData> clazz; - try { - clazz = (Class<? extends InternalProfilingData>) Class.forName(className); - } catch (Exception e) { - throw new IOException(e); - } - - InternalProfilingData profilingData ; - try { - profilingData = clazz.newInstance(); - } catch (Exception e) { - throw new IOException(e); - } - - // Restore internal state - profilingData.read(in); - - this.queuedProfilingData.add(profilingData); - } - } - - public int size() { - return this.queuedProfilingData.size(); - } - - public boolean isEmpty() { - return this.queuedProfilingData.isEmpty(); - } - - public Iterator<InternalProfilingData> getIterator() { - return this.queuedProfilingData.iterator(); - } - - @Override - public void write(DataOutputView out) throws IOException { - - // Write the number of records - out.writeInt(this.queuedProfilingData.size()); - // Write the records themselves - for (InternalProfilingData profilingData : this.queuedProfilingData) { - StringUtils.writeNullableString(profilingData.getClass().getName(), out); - profilingData.write(out); - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/InstanceProfilingEvent.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/InstanceProfilingEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/InstanceProfilingEvent.java deleted file mode 100644 index 7de9252..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/InstanceProfilingEvent.java +++ /dev/null @@ -1,399 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.profiling.types; - -import java.io.IOException; - -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.api.common.JobID; - -/** - * Instance profiling events are a special subclass of profiling events. They contain profiling information about the - * utilization of a particular instance during a job execution. - */ -public abstract class InstanceProfilingEvent extends ProfilingEvent { - - private static final long serialVersionUID = 5964092674722506040L; - - /** - * The interval of time this profiling event covers in milliseconds. - */ - private int profilingInterval; - - /** - * The percentage of time the CPU(s) spent in state IOWAIT during the profiling interval. - */ - private int ioWaitCPU; - - /** - * The percentage of time the CPU(s) spent in state IDLE during the profiling interval. - */ - private int idleCPU; - - /** - * The percentage of time the CPU(s) spent in state USER during the profiling interval. - */ - private int userCPU; - - /** - * The percentage of time the CPU(s) spent in state SYSTEM during the profiling interval. - */ - private int systemCPU; - - /** - * The percentage of time the CPU(s) spent in state HARD_IRQ during the profiling interval. - */ - private int hardIrqCPU; - - /** - * The percentage of time the CPU(s) spent in state SOFT_IRQ during the profiling interval. - */ - private int softIrqCPU; - - /** - * The total amount of this instance's main memory in bytes. - */ - private long totalMemory; - - /** - * The free amount of this instance's main memory in bytes. - */ - private long freeMemory; - - /** - * The amount of main memory the instance uses for file buffers. - */ - private long bufferedMemory; - - /** - * The amount of main memory the instance uses as cache memory. - */ - private long cachedMemory; - - /** - * The amount of main memory the instance uses for cached swaps. - */ - private long cachedSwapMemory; - - /** - * The number of bytes received via network during the profiling interval. - */ - private long receivedBytes; - - /** - * The number of bytes transmitted via network during the profiling interval. - */ - private long transmittedBytes; - - /** - * Constructs a new instance profiling event. - * - * @param profilingInterval - * the interval of time this profiling event covers in milliseconds - * @param ioWaitCPU - * the percentage of time the CPU(s) spent in state IOWAIT during the profiling interval - * @param idleCPU - * the percentage of time the CPU(s) spent in state IDLE during the profiling interval - * @param userCPU - * the percentage of time the CPU(s) spent in state USER during the profiling interval - * @param systemCPU - * the percentage of time the CPU(s) spent in state SYSTEM during the profiling interval - * @param hardIrqCPU - * the percentage of time the CPU(s) spent in state HARD_IRQ during the profiling interval - * @param softIrqCPU - * the percentage of time the CPU(s) spent in state SOFT_IRQ during the profiling interval - * @param totalMemory - * the total amount of this instance's main memory in bytes - * @param freeMemory - * the free amount of this instance's main memory in bytes - * @param bufferedMemory - * the amount of main memory the instance uses for file buffers - * @param cachedMemory - * the amount of main memory the instance uses as cache memory - * @param cachedSwapMemory - * The amount of main memory the instance uses for cached swaps - * @param receivedBytes - * the number of bytes received via network during the profiling interval - * @param transmittedBytes - * the number of bytes transmitted via network during the profiling interval - * @param jobID - * the ID of this job this profiling event belongs to - * @param timestamp - * the time stamp of this profiling event's creation - * @param profilingTimestamp - * the time stamp relative to the beginning of the job's execution - */ - public InstanceProfilingEvent(final int profilingInterval, final int ioWaitCPU, final int idleCPU, - final int userCPU, final int systemCPU, final int hardIrqCPU, final int softIrqCPU, final long totalMemory, - final long freeMemory, final long bufferedMemory, final long cachedMemory, final long cachedSwapMemory, - final long receivedBytes, final long transmittedBytes, final JobID jobID, final long timestamp, - final long profilingTimestamp) { - - super(jobID, timestamp, profilingTimestamp); - - this.profilingInterval = profilingInterval; - - this.ioWaitCPU = ioWaitCPU; - this.idleCPU = idleCPU; - this.userCPU = userCPU; - this.systemCPU = systemCPU; - this.hardIrqCPU = hardIrqCPU; - this.softIrqCPU = softIrqCPU; - - this.totalMemory = totalMemory; - this.freeMemory = freeMemory; - this.bufferedMemory = bufferedMemory; - this.cachedMemory = cachedMemory; - this.cachedSwapMemory = cachedSwapMemory; - - this.receivedBytes = receivedBytes; - this.transmittedBytes = transmittedBytes; - } - - /** - * Default constructor for serialization/deserialization. - */ - public InstanceProfilingEvent() { - super(); - } - - // -------------------------------------------------------------------------------------------- - - /** - * Returns the interval of time this profiling event covers in milliseconds. - * - * @return the interval of time this profiling event covers in milliseconds - */ - public final int getProfilingInterval() { - return this.profilingInterval; - } - - /** - * Returns the total amount of memory of the corresponding instance. - * - * @return the total amount of memory in bytes - */ - public final long getTotalMemory() { - return this.totalMemory; - } - - /** - * Returns the amount of free memory of the corresponding instance. - * - * @return the amount of free memory in bytes. - */ - public final long getFreeMemory() { - return this.freeMemory; - } - - /** - * Returns the amount of memory, in bytes, used for file buffers. - * - * @return the amount of memory used for file buffers in bytes - */ - public final long getBufferedMemory() { - return this.bufferedMemory; - } - - /** - * Returns the amount of memory, in bytes, used as cache memory. - * - * @return the amount of memory used as cache memory in bytes - */ - public final long getCachedMemory() { - return this.cachedMemory; - } - - /** - * Returns the amount of swap, in bytes, used as cache memory. - * - * @return the amount of, in bytes, used as cache memory - */ - public final long getCachedSwapMemory() { - return this.cachedSwapMemory; - } - - /** - * Returns the percentage of time the CPU(s) spent in state USER during the profiling interval. - * - * @return the percentage of time the CPU(s) spent in state USER during the profiling interval - */ - public final int getUserCPU() { - return this.userCPU; - } - - /** - * Returns the percentage of time the CPU(s) spent in state SYSTEM during the profiling interval. - * - * @return the percentage of time the CPU(s) spent in state SYSTEM during the profiling interval - */ - public final int getSystemCPU() { - return this.systemCPU; - } - - /** - * Returns the percentage of time the CPU(s) spent in state IDLE during the profiling interval. Prior to Linux - * 2.5.41, this includes IO-wait time. - * - * @return the percentage of time the CPU(s) spent in state IDLE during the profiling interval - */ - public final int getIdleCPU() { - return this.idleCPU; - } - - /** - * Returns the percentage of time the CPU(s) spent in state IOWAIT during the profiling interval. Prior to Linux - * 2.5.41, included in idle. - * - * @return the percentage of time the CPU(s) spent in state IOWAIT during the profiling interval. - */ - public final int getIOWaitCPU() { - return this.ioWaitCPU; - } - - /** - * Returns the percentage of time the CPU(s) spent in state HARD_IRQ during the profiling interval. - * - * @return the percentage of time the CPU(s) spent in state HARD_IRQ during the profiling interval - */ - public final int getHardIrqCPU() { - return this.hardIrqCPU; - } - - /** - * Returns the percentage of time the CPU(s) spent in state SOFT_IRQ during the profiling interval. - * - * @return the percentage of time the CPU(s) spent in state SOFT_IRQ during the profiling interval - */ - public final int getSoftIrqCPU() { - return this.softIrqCPU; - } - - /** - * Returns the number of bytes received via network during the profiling interval. - * - * @return the number of bytes received via network during the profiling interval - */ - public final long getReceivedBytes() { - return this.receivedBytes; - } - - /** - * Returns the number of bytes transmitted via network during the profiling interval. - * - * @return the number of bytes transmitted via network during the profiling interval - */ - public final long getTransmittedBytes() { - return this.transmittedBytes; - } - - // -------------------------------------------------------------------------------------------- - // Serialization - // -------------------------------------------------------------------------------------------- - - @Override - public void read(final DataInputView in) throws IOException { - super.read(in); - - this.profilingInterval = in.readInt(); - - this.ioWaitCPU = in.readInt(); - this.idleCPU = in.readInt(); - this.userCPU = in.readInt(); - this.systemCPU = in.readInt(); - this.hardIrqCPU = in.readInt(); - this.softIrqCPU = in.readInt(); - - this.totalMemory = in.readLong(); - this.freeMemory = in.readLong(); - this.bufferedMemory = in.readLong(); - this.cachedMemory = in.readLong(); - this.cachedSwapMemory = in.readLong(); - - this.receivedBytes = in.readLong(); - this.transmittedBytes = in.readLong(); - } - - - @Override - public void write(final DataOutputView out) throws IOException { - super.write(out); - - out.writeInt(this.profilingInterval); - - out.writeInt(this.ioWaitCPU); - out.writeInt(this.idleCPU); - out.writeInt(this.userCPU); - out.writeInt(this.systemCPU); - out.writeInt(this.hardIrqCPU); - out.writeInt(this.softIrqCPU); - - out.writeLong(totalMemory); - out.writeLong(freeMemory); - out.writeLong(bufferedMemory); - out.writeLong(cachedMemory); - out.writeLong(cachedSwapMemory); - - out.writeLong(receivedBytes); - out.writeLong(transmittedBytes); - } - - - // -------------------------------------------------------------------------------------------- - // Utilities - // -------------------------------------------------------------------------------------------- - - @Override - public boolean equals(final Object obj) { - if (obj instanceof InstanceProfilingEvent) { - final InstanceProfilingEvent other = (InstanceProfilingEvent) obj; - - return super.equals(obj) && - profilingInterval == other.profilingInterval && - ioWaitCPU == other.ioWaitCPU && - idleCPU == other.idleCPU && - userCPU == other.userCPU && - systemCPU == other.systemCPU && - hardIrqCPU == other.hardIrqCPU && - softIrqCPU == other.softIrqCPU && - totalMemory == other.totalMemory && - freeMemory == other.freeMemory && - bufferedMemory == other.bufferedMemory && - cachedMemory == other.cachedMemory && - cachedSwapMemory == other.cachedSwapMemory && - receivedBytes == other.receivedBytes && - transmittedBytes == other.transmittedBytes; - } - else { - return false; - } - } - - - @Override - public int hashCode() { - long hashCode = getJobID().hashCode() + getTimestamp() + getProfilingTimestamp(); - hashCode += this.profilingInterval + this.ioWaitCPU + this.idleCPU + this.userCPU + this.systemCPU - + this.hardIrqCPU + this.softIrqCPU; - hashCode += (this.totalMemory + this.freeMemory + this.bufferedMemory + this.cachedMemory + this.cachedSwapMemory); - hashCode ^= hashCode >>> 32; - return (int) hashCode; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/InstanceSummaryProfilingEvent.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/InstanceSummaryProfilingEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/InstanceSummaryProfilingEvent.java deleted file mode 100644 index 4bb052d..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/InstanceSummaryProfilingEvent.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.profiling.types; - -import org.apache.flink.api.common.JobID; - -/** - * Instance summary profiling events summarize the profiling events of all instances involved in computing a job. - */ -public final class InstanceSummaryProfilingEvent extends InstanceProfilingEvent { - - private static final long serialVersionUID = 1L; - - /** - * Constructs a new instance summary profiling event. - * - * @param profilingInterval - * the interval of time this profiling event covers in milliseconds - * @param ioWaitCPU - * the percentage of time the CPU(s) spent in state IOWAIT during the profiling interval - * @param idleCPU - * the percentage of time the CPU(s) spent in state IDLE during the profiling interval - * @param userCPU - * the percentage of time the CPU(s) spent in state USER during the profiling interval - * @param systemCPU - * the percentage of time the CPU(s) spent in state SYSTEM during the profiling interval - * @param hardIrqCPU - * the percentage of time the CPU(s) spent in state HARD_IRQ during the profiling interval - * @param softIrqCPU - * the percentage of time the CPU(s) spent in state SOFT_IRQ during the profiling interval - * @param totalMemory - * the total amount of this instance's main memory in bytes - * @param freeMemory - * the free amount of this instance's main memory in bytes - * @param bufferedMemory - * the amount of main memory the instance uses for file buffers - * @param cachedMemory - * the amount of main memory the instance uses as cache memory - * @param cachedSwapMemory - * The amount of main memory the instance uses for cached swaps - * @param receivedBytes - * the number of bytes received via network during the profiling interval - * @param transmittedBytes - * the number of bytes transmitted via network during the profiling interval - * @param jobID - * the ID of this job this profiling event belongs to - * @param timestamp - * the time stamp of this profiling event's creation - * @param profilingTimestamp - * the time stamp relative to the beginning of the job's execution - */ - public InstanceSummaryProfilingEvent(final int profilingInterval, final int ioWaitCPU, final int idleCPU, - final int userCPU, final int systemCPU, final int hardIrqCPU, final int softIrqCPU, final long totalMemory, - final long freeMemory, final long bufferedMemory, final long cachedMemory, final long cachedSwapMemory, - final long receivedBytes, final long transmittedBytes, final JobID jobID, - final long timestamp, final long profilingTimestamp) { - super(profilingInterval, ioWaitCPU, idleCPU, userCPU, systemCPU, hardIrqCPU, softIrqCPU, totalMemory, - freeMemory, bufferedMemory, cachedMemory, cachedSwapMemory, receivedBytes, transmittedBytes, jobID, - timestamp, profilingTimestamp); - } - - /** - * Default constructor for serialization/deserialization. - */ - public InstanceSummaryProfilingEvent() { - super(); - } - - @Override - public boolean equals(Object obj) { - return (obj instanceof InstanceSummaryProfilingEvent) && super.equals(obj); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/ProfilingEvent.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/ProfilingEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/ProfilingEvent.java deleted file mode 100644 index 13a9a99..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/ProfilingEvent.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.profiling.types; - -import java.io.IOException; - -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.runtime.event.job.AbstractEvent; -import org.apache.flink.api.common.JobID; - -import com.google.common.base.Preconditions; - -/** - * A profiling event is a special type of event. It is intended to transport profiling data of a Nephele job to external - * components. - */ -public abstract class ProfilingEvent extends AbstractEvent { - - private static final long serialVersionUID = 1L; - - /** The ID of the job the profiling data belongs to. */ - private final JobID jobID; - - /** The profiling time stamp. */ - private long profilingTimestamp; - - /** - * Constructs a new profiling event. - * - * @param jobID - * the ID of the job this profiling events belongs to - * @param timestamp - * the time stamp of the event - * @param profilingTimestamp - * the time stamp of the profiling data - */ - public ProfilingEvent(JobID jobID, long timestamp, long profilingTimestamp) { - super(timestamp); - - Preconditions.checkNotNull(jobID); - this.jobID = jobID; - this.profilingTimestamp = profilingTimestamp; - } - - /** - * Default constructor for serialization/deserialization. - */ - public ProfilingEvent() { - super(); - this.jobID = new JobID(); - } - - // -------------------------------------------------------------------------------------------- - - /** - * Returns the ID of the job this profiling information belongs to. - * - * @return the ID of the job this profiling information belongs to - */ - public JobID getJobID() { - return this.jobID; - } - - /** - * Returns the timestamp of this profiling information. The timestamp denotes - * the time period in milliseconds between the creation of this profiling information - * and the start of the corresponding vertex's execution. - * - * @return the timestamp of this profiling information. - */ - public long getProfilingTimestamp() { - return this.profilingTimestamp; - } - - // -------------------------------------------------------------------------------------------- - // Serialization - // -------------------------------------------------------------------------------------------- - - @Override - public void read(DataInputView in) throws IOException { - super.read(in); - - this.jobID.read(in); - this.profilingTimestamp = in.readLong(); - } - - - @Override - public void write(DataOutputView out) throws IOException { - super.write(out); - - this.jobID.write(out); - out.writeLong(this.profilingTimestamp); - } - - // -------------------------------------------------------------------------------------------- - // Utilities - // -------------------------------------------------------------------------------------------- - - @Override - public boolean equals(Object obj) { - if (obj instanceof ProfilingEvent) { - final ProfilingEvent other = (ProfilingEvent) obj; - - return super.equals(obj) && this.profilingTimestamp == other.profilingTimestamp && - this.jobID.equals(other.jobID); - } - else { - return false; - } - } - - @Override - public int hashCode() { - return this.jobID.hashCode() ^ ((int) (profilingTimestamp >>> 32)) ^ ((int) (profilingTimestamp)) ^ - super.hashCode(); - } -}