Author: cwiklik Date: Thu Jan 12 18:54:54 2017 New Revision: 1778456 URL: http://svn.apache.org/viewvc?rev=1778456&view=rev Log: UIMA-5245 refactored process metrics gathering. Uses cgroups to unify metrics sourcing.
Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsManager.java uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessCpuUsageCollector.java uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessMajorFaultCollector.java uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessResidentMemoryCollector.java uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessSwapUsageCollector.java uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/memory/DuccProcessResidentMemory.java uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessMemoryPageLoadUsage.java uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessSwapSpaceUsage.java uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/ProcessMemoryPageLoadUsage.java Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsManager.java URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsManager.java?rev=1778456&r1=1778455&r2=1778456&view=diff ============================================================================== --- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsManager.java (original) +++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsManager.java Thu Jan 12 18:54:54 2017 @@ -509,6 +509,17 @@ public class CGroupsManager { } return location+id+"cpuacct.usage"; } + private String composeMemoryStatFileName(String id) { + String location = getCGroupLocation("memory").trim(); + if ( !location.endsWith(System.getProperty("file.separator"))) { + location = location + System.getProperty("file.separator"); + } + if ( !legacyCgConfig ) { + location += SYSTEM+System.getProperty("file.separator"); + } + return location+id+"memory.stat"; + } + public boolean isCpuReportingEnabled() { // String file = getCGroupLocation("cpuacct")+System.getProperty("file.separator")+"cpuacct.usage"; @@ -551,7 +562,63 @@ public class CGroupsManager { usage = -1; // cgroups accounting not configured } return usage; + } + public enum CgroupMemoryStat { + RSS("rss"), + SWAP("swap"), + FAULTS("pgpgin"); + + String key; + + private CgroupMemoryStat(String aKey) { + this.key = aKey; + } + + public String getKey() { + return this.key; + } + + } + public long getUsageForMemoryStat(CgroupMemoryStat stat, String containerId ) throws Exception { + long usage = -1; + + if (!containerId.endsWith(System.getProperty("file.separator"))) { + containerId = containerId + System.getProperty("file.separator"); + } + String file = composeMemoryStatFileName(containerId.trim()); + agentLogger.info("getUsageForMemoryStat", null, "MEMORY.STAT file:"+file); + File f = new File(file); + if ( f.exists() ) { + InputStreamReader isr = new InputStreamReader(new FileInputStream(f)); + BufferedReader br = new BufferedReader(isr); + String line; + try { + while ((line = br.readLine()) != null) { + agentLogger.trace("getUsageForMemoryStat", null, "MEMORY.STAT Line:"+line); + if ( line.startsWith(stat.getKey())) { + usage = Long.parseLong(line.trim().split(" ")[1]); + break; + } + } + } catch ( Exception e) { + agentLogger.error("getUsageForMemoryStat", null, e); + } + finally { + if (isr != null) { + isr.close(); + } + agentLogger.trace("getUsageForMemoryStat", null, "Done Reading memory.stat file:"+file); + } + } else { + agentLogger.info("getUsageForMemoryStat", null, "MEMORY.STAT file:"+file+" Not Found - Process RSS Usage is Unavailable"); + + usage = -1; // cgroups accounting not configured + } + return usage; + } + + /** * Sets the max memory use for an existing cgroup container. * Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessCpuUsageCollector.java URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessCpuUsageCollector.java?rev=1778456&r1=1778455&r2=1778456&view=diff ============================================================================== --- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessCpuUsageCollector.java (original) +++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessCpuUsageCollector.java Thu Jan 12 18:54:54 2017 @@ -44,45 +44,7 @@ public class ProcessCpuUsageCollector im } private long collect() throws Exception{ - + // use cgroups manager to collect cpu usage return cgm.getCpuUsage(containerId); - - } -/* - private String execTopShell() throws Exception { - List<String> command = new ArrayList<String>(); - command.add("top"); - command.add("-b"); - command.add("-n"); - command.add("1"); - command.add("-p"); - command.add(pid); - - ProcessBuilder builder = new ProcessBuilder(command); - Process process = builder.start(); - InputStream is = process.getInputStream(); - InputStreamReader isr = new InputStreamReader(is); - BufferedReader br = new BufferedReader(isr); - String line; - int count = 0; - String cpu = ""; - try { - while ((line = br.readLine()) != null) { - if (count == 7) { - String[] values = line.trim().split("\\s+"); - cpu = values[9]; - process.destroy(); - break; - } - count++; - } - } finally { - if (is != null) { - is.close(); - } - } - process.waitFor(); - return cpu; } - */ } Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessMajorFaultCollector.java URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessMajorFaultCollector.java?rev=1778456&r1=1778455&r2=1778456&view=diff ============================================================================== --- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessMajorFaultCollector.java (original) +++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessMajorFaultCollector.java Thu Jan 12 18:54:54 2017 @@ -20,26 +20,32 @@ package org.apache.uima.ducc.agent.metri import java.util.concurrent.Callable; +import org.apache.uima.ducc.agent.launcher.CGroupsManager; +import org.apache.uima.ducc.agent.launcher.CGroupsManager.CgroupMemoryStat; import org.apache.uima.ducc.common.agent.metrics.swap.DuccProcessMemoryPageLoadUsage; import org.apache.uima.ducc.common.agent.metrics.swap.ProcessMemoryPageLoadUsage; import org.apache.uima.ducc.common.utils.DuccLogger; public class ProcessMajorFaultCollector implements Callable<ProcessMemoryPageLoadUsage> { - String pid; + private CGroupsManager cgm=null; + private String containerId=null; - public ProcessMajorFaultCollector(DuccLogger logger, String pid ) { - this.pid = pid; + public ProcessMajorFaultCollector(DuccLogger logger, CGroupsManager mgr, String containerId ) { + this.cgm = mgr; + this.containerId = containerId; } public ProcessMemoryPageLoadUsage call() throws Exception { try { - //super.parseMetricFile(); - return new DuccProcessMemoryPageLoadUsage(pid); + return new DuccProcessMemoryPageLoadUsage(collect()); } catch (Exception e) { e.printStackTrace(); throw e; } } - + private long collect() throws Exception{ + // use cgroups manager to collect rss usage + return cgm.getUsageForMemoryStat(CgroupMemoryStat.FAULTS,containerId); + } } Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessResidentMemoryCollector.java URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessResidentMemoryCollector.java?rev=1778456&r1=1778455&r2=1778456&view=diff ============================================================================== --- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessResidentMemoryCollector.java (original) +++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessResidentMemoryCollector.java Thu Jan 12 18:54:54 2017 @@ -18,23 +18,30 @@ */ package org.apache.uima.ducc.agent.metrics.collectors; -import java.io.RandomAccessFile; import java.util.concurrent.Callable; +import org.apache.uima.ducc.agent.launcher.CGroupsManager; +import org.apache.uima.ducc.agent.launcher.CGroupsManager.CgroupMemoryStat; import org.apache.uima.ducc.common.agent.metrics.memory.DuccProcessResidentMemory; import org.apache.uima.ducc.common.agent.metrics.memory.ProcessResidentMemory; +import org.apache.uima.ducc.common.utils.DuccLogger; -public class ProcessResidentMemoryCollector extends AbstractMetricCollector +public class ProcessResidentMemoryCollector implements Callable<ProcessResidentMemory> { - public ProcessResidentMemoryCollector(RandomAccessFile fileHandle, - int howMany, int offset) { - super(fileHandle, howMany, offset); + private String containerId=null; + private CGroupsManager cgm=null; + + public ProcessResidentMemoryCollector(DuccLogger logger, CGroupsManager mgr, String jobId) { + this.containerId = jobId; + this.cgm = mgr; } public ProcessResidentMemory call() throws Exception { - super.parseMetricFile(); - return new DuccProcessResidentMemory(super.metricFileContents, - super.metricFieldOffsets, super.metricFieldLengths); + return new DuccProcessResidentMemory(collect()); + } + private long collect() throws Exception{ + // use cgroups manager to collect rss usage + return cgm.getUsageForMemoryStat(CgroupMemoryStat.RSS,containerId); } } Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessSwapUsageCollector.java URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessSwapUsageCollector.java?rev=1778456&r1=1778455&r2=1778456&view=diff ============================================================================== --- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessSwapUsageCollector.java (original) +++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessSwapUsageCollector.java Thu Jan 12 18:54:54 2017 @@ -18,65 +18,34 @@ */ package org.apache.uima.ducc.agent.metrics.collectors; -import java.io.RandomAccessFile; import java.util.concurrent.Callable; -import org.apache.uima.ducc.common.agent.metrics.swap.DuccProcessMemoryPageLoadUsage; -import org.apache.uima.ducc.common.agent.metrics.swap.ProcessMemoryPageLoadUsage; +import org.apache.uima.ducc.agent.launcher.CGroupsManager; +import org.apache.uima.ducc.agent.launcher.CGroupsManager.CgroupMemoryStat; +import org.apache.uima.ducc.common.agent.metrics.swap.DuccProcessSwapSpaceUsage; +import org.apache.uima.ducc.common.agent.metrics.swap.ProcessSwapSpaceUsage; import org.apache.uima.ducc.common.utils.DuccLogger; public class ProcessSwapUsageCollector implements - Callable<ProcessMemoryPageLoadUsage> { - String pid; - - public ProcessSwapUsageCollector(DuccLogger logger, String pid, - RandomAccessFile fileHandle, int howMany, int offset) { - this.pid = pid; + Callable<ProcessSwapSpaceUsage> { + private CGroupsManager cgm=null; + private String containerId=null; + + public ProcessSwapUsageCollector(DuccLogger logger, CGroupsManager mgr, String jobId ) { + this.containerId = jobId; + this.cgm = mgr; } - public ProcessMemoryPageLoadUsage call() throws Exception { + public ProcessSwapSpaceUsage call() throws Exception { try { - return new DuccProcessMemoryPageLoadUsage(pid); + return new DuccProcessSwapSpaceUsage(collect()); } catch (Exception e) { e.printStackTrace(); throw e; } } -/* - private String execTopShell() throws Exception { - List<String> command = new ArrayList<String>(); - command.add("top"); - command.add("-b"); - command.add("-n"); - command.add("1"); - command.add("-p"); - command.add(pid); - - ProcessBuilder builder = new ProcessBuilder(command); - Process process = builder.start(); - InputStream is = process.getInputStream(); - InputStreamReader isr = new InputStreamReader(is); - BufferedReader br = new BufferedReader(isr); - String line; - int count = 0; - String cpu = ""; - try { - while ((line = br.readLine()) != null) { - if (count == 7) { - String[] values = line.trim().split("\\s+"); - cpu = values[9]; - process.destroy(); - break; - } - count++; - } - } finally { - if (is != null) { - is.close(); - } - } - process.waitFor(); - return cpu; + private long collect() throws Exception{ + // use cgroups manager to collect rss usage + return cgm.getUsageForMemoryStat(CgroupMemoryStat.SWAP,containerId); } - */ } Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java?rev=1778456&r1=1778455&r2=1778456&view=diff ============================================================================== --- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java (original) +++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java Thu Jan 12 18:54:54 2017 @@ -31,10 +31,13 @@ import org.apache.uima.ducc.agent.metric import org.apache.uima.ducc.agent.metrics.collectors.ProcessCpuUsageCollector; import org.apache.uima.ducc.agent.metrics.collectors.ProcessMajorFaultCollector; import org.apache.uima.ducc.agent.metrics.collectors.ProcessResidentMemoryCollector; +import org.apache.uima.ducc.agent.metrics.collectors.ProcessSwapUsageCollector; import org.apache.uima.ducc.common.agent.metrics.cpu.ProcessCpuUsage; +import org.apache.uima.ducc.common.agent.metrics.memory.DuccProcessResidentMemory; import org.apache.uima.ducc.common.agent.metrics.memory.ProcessResidentMemory; import org.apache.uima.ducc.common.agent.metrics.swap.DuccProcessSwapSpaceUsage; import org.apache.uima.ducc.common.agent.metrics.swap.ProcessMemoryPageLoadUsage; +import org.apache.uima.ducc.common.agent.metrics.swap.ProcessSwapSpaceUsage; import org.apache.uima.ducc.common.node.metrics.ProcessGarbageCollectionStats; import org.apache.uima.ducc.common.utils.DuccLogger; import org.apache.uima.ducc.common.utils.Utils; @@ -47,17 +50,12 @@ public class LinuxProcessMetricsProcesso ProcessMetricsProcessor { private RandomAccessFile statmFile; - // private RandomAccessFile nodeStatFile; private RandomAccessFile processStatFile; - //private long totalCpuInitUsage = 0; - private long previousCPUReadingInMillis = 0; private long previousSnapshotTime = 0; - //private boolean initializing = true; - private final ExecutorService pool; private IDuccProcess process; @@ -76,17 +74,16 @@ public class LinuxProcessMetricsProcesso private volatile boolean closed = true; - //private long clockAtStartOfRun = 0; private long percentCPU = 0; - + + public LinuxProcessMetricsProcessor(DuccLogger logger, IDuccProcess process, NodeAgent agent, String statmFilePath, String nodeStatFilePath, String processStatFilePath, ManagedProcess managedProcess) throws FileNotFoundException { this.logger = logger; statmFile = new RandomAccessFile(statmFilePath, "r"); - // nodeStatFile = new RandomAccessFile(nodeStatFilePath, "r"); processStatFile = new RandomAccessFile(processStatFilePath, "r"); this.managedProcess = managedProcess; this.agent = agent; @@ -148,335 +145,269 @@ public class LinuxProcessMetricsProcesso return true; } - public void process(Exchange e) { - if (closed) { // files closed - return; + private long getSwapUsage() throws Exception { + long swapUsage = -1; + if (agent.useCgroups) { + + String containerId = agent.cgroupsManager + .getContainerId(managedProcess); + + ProcessSwapUsageCollector processSwapCollector = new ProcessSwapUsageCollector( + logger, agent.cgroupsManager, containerId); + logger.info("LinuxProcessMetricsProcessor.getSwapUsage", null, + "Fetching Swap Usage PID:" + process.getPID()); + Future<ProcessSwapSpaceUsage> processFaults = pool + .submit(processSwapCollector); + swapUsage = processFaults.get().getSwapUsage(); + logger.info("LinuxProcessMetricsProcessor.getSwapUsage", null, + " Process Swap Usage:" + swapUsage); + } + return swapUsage; + } + + private long getFaults() throws Exception { + long faults = -1; + if (agent.useCgroups) { + String containerId = agent.cgroupsManager.getContainerId(managedProcess); + + ProcessMajorFaultCollector processFaultsCollector = + new ProcessMajorFaultCollector(logger, agent.cgroupsManager, containerId); + logger.info("LinuxProcessMetricsProcessor.getFaults",null,"Fetching Page Faults PID:"+process.getPID()); + Future<ProcessMemoryPageLoadUsage> processFaults = pool.submit(processFaultsCollector); + faults = processFaults.get().getMajorFaults(); + logger.info( + "LinuxProcessMetricsProcessor.getFaults",null," Process Faults (pgpgin):"+faults); + } + return faults; + } + private long getRss() throws Exception { + long rss = -1; + if (agent.useCgroups) { + String containerId = agent.cgroupsManager.getContainerId(managedProcess); + + ProcessResidentMemoryCollector processRSSCollector = + new ProcessResidentMemoryCollector(logger, agent.cgroupsManager, containerId); + logger.info("LinuxProcessMetricsProcessor.getRss",null,"Fetching RSS Usage for PID:"+process.getPID()); + Future<ProcessResidentMemory> processRss = pool.submit(processRSSCollector); + rss = processRss.get().get(); + logger.info( + "LinuxProcessMetricsProcessor.getRss",null," Process RSS:"+rss); } + return rss; + } + + private long getCpuUsage() throws Exception { + long cpuUsage=-1; + if (agent.useCgroups) { + String containerId = agent.cgroupsManager.getContainerId(managedProcess); + + Future<ProcessCpuUsage> processCpuUsage = null; + ProcessCpuUsageCollector processCpuUsageCollector = + new ProcessCpuUsageCollector(logger, agent.cgroupsManager, containerId); + logger.info("LinuxProcessMetricsProcessor.getCpuUsage",null,"Fetching CPU Usage for PID:"+process.getPID()); + processCpuUsage = pool + .submit(processCpuUsageCollector); + long cpuUsageInNanos = processCpuUsage.get().getCpuUsage(); + if ( cpuUsageInNanos >= 0 ) { + // cpuUsage comes from cpuacct.usage and is in nanos + cpuUsage = Math.round( cpuUsageInNanos / 1000000 ); // normalize into millis + } + logger.info( + "LinuxProcessMetricsProcessor.getCpuUsage",null, + "CPU USAGE:"+cpuUsageInNanos+ " CLOCK RATE:"+agent.cpuClockRate+" Total CPU USAGE:"+cpuUsage); + } + return cpuUsage; + } + + private long getCpuTime( long totalCpuUsageInMillis) throws Exception { + long cp = -1; + if (managedProcess.getDuccProcess().getProcessState() + .equals(ProcessState.Running) || + managedProcess.getDuccProcess().getProcessState() + .equals(ProcessState.Initializing) + ) { + if (agent.useCgroups && totalCpuUsageInMillis != -1) { + + long timeRunning = 1; + if ( process.getTimeWindowInit() != null ) { + timeRunning = process.getTimeWindowInit().getElapsedMillis(); + } + if ( process.getTimeWindowRun() != null ) { + timeRunning += process.getTimeWindowRun().getElapsedMillis(); + } + // normalize time in running state into seconds + percentCPU = Math.round(100*( (totalCpuUsageInMillis*1.0)/ (timeRunning*1.0))); + cp = percentCPU; + } + } else { + cp = percentCPU; + } + return cp; + } + private long getCurrentCpu(long totalCpuUsageInMillis ) { + long currentCpu=-1; + // publish current CPU usage by computing a delta from the last time + // CPU data was fetched. + if ( totalCpuUsageInMillis > 0 ) { + double millisCPU = ( totalCpuUsageInMillis - previousCPUReadingInMillis )*1.0; + double millisRun = ( System.currentTimeMillis() - previousSnapshotTime )*1.0; + currentCpu = Math.round(100*(millisCPU/millisRun) ) ; + previousCPUReadingInMillis = totalCpuUsageInMillis; + previousSnapshotTime = System.currentTimeMillis(); + } else { + if (agent.useCgroups && totalCpuUsageInMillis != -1 ) { + currentCpu = 0; + } + } + return currentCpu; + } + - // if process is stopping or already dead dont collect metrics. The - // Camel - // route has just been stopped. - if (!collectStats(process.getProcessState())) { + private void killProcsIfExceedingMemoryThreshold() throws Exception { + if ( !agent.useCgroups ) { return; } - if (process.getProcessState().equals(ProcessState.Initializing) - || process.getProcessState().equals(ProcessState.Running)) - try { + if (process.getSwapUsage() > 0 + && process.getSwapUsage() > managedProcess + .getMaxSwapThreshold()) { + } else { + String containerId = agent.cgroupsManager + .getContainerId(managedProcess); + + String[] cgroupPids = agent.cgroupsManager + .getPidsInCgroup(containerId); + logger.info("LinuxProcessMetricsProcessor.process",null,"Container ID:"+containerId+" cgroup pids "+cgroupPids.length); + + // Use Memory Guard only if cgroups are disabled and fudge + // factor > -1 + + if ( fudgeFactor > -1 + && managedProcess.getProcessMemoryAssignment() + .getMaxMemoryWithFudge() > 0) { - // executes script - // DUCC_HOME/admin/ducc_get_process_swap_usage.sh which sums up - // swap used by - // a process - long totalSwapUsage = 0; - long totalFaults = 0; - long totalCpuUsageInMillis = 0; - long totalRss = 0; - //int currentCpuUsage = 0; - Future<ProcessMemoryPageLoadUsage> processMajorFaultUsage = null; - Future<ProcessCpuUsage> processCpuUsage = null; - String[] cgroupPids = new String[0]; - try { - String swapUsageScript = System - .getProperty("ducc.agent.swap.usage.script"); - - if (agent.useCgroups) { - String containerId = agent.cgroupsManager - .getContainerId(managedProcess); - cgroupPids = agent.cgroupsManager - .getPidsInCgroup(containerId); - for (String pid : cgroupPids) { - // the swap usage script is defined in - // ducc.properties - if (swapUsageScript != null) { - DuccProcessSwapSpaceUsage processSwapSpaceUsage = new DuccProcessSwapSpaceUsage( - pid, managedProcess.getOwner(), - swapUsageScript, logger); - totalSwapUsage += processSwapSpaceUsage - .getSwapUsage(); - } + long rss = (process.getResidentMemory() / 1024) / 1024; // normalize RSS into MB - ProcessMajorFaultCollector processMajorFaultUsageCollector = new ProcessMajorFaultCollector( - logger, pid); - // if process is stopping or already dead dont - // collect metrics. The Camel - // route has just been stopped. - if (!collectStats(process.getProcessState())) { - return; - } - - processMajorFaultUsage = pool - .submit(processMajorFaultUsageCollector); - totalFaults += processMajorFaultUsage.get() - .getMajorFaults(); - try { - if (!collectStats(process.getProcessState())) { - return; + logger.trace( + "process", + null, + "*** Process with PID:" + + managedProcess.getPid() + + " Assigned Memory (MB): " + + managedProcess + .getProcessMemoryAssignment() + + " MBs. Current RSS (MB):" + rss); + // check if process resident memory exceeds its memory + // assignment calculate in the PM + if (rss > managedProcess.getProcessMemoryAssignment() + .getMaxMemoryWithFudge()) { + logger.error( + "process", + null, + "\n\n********************************************************\n\tProcess with PID:" + + managedProcess.getPid() + + " Exceeded its max memory assignment (including a fudge factor) of " + + managedProcess + .getProcessMemoryAssignment() + .getMaxMemoryWithFudge() + + " MBs. This Process Resident Memory Size: " + + rss + + " MBs .Killing process ...\n********************************************************\n\n"); + try { + managedProcess.kill(); // mark it for death + process.setReasonForStoppingProcess(ReasonForStoppingProcess.ExceededShareSize + .toString()); + agent.stopProcess(process); + + if (agent.useCgroups) { + for (String pid : cgroupPids) { + // skip the main process that was just + // killed above. Only kill + // its child processes. + if (pid.equals(managedProcess + .getDuccProcess().getPID())) { + continue; } - - } catch( Exception ee) { - logger.warn( - "LinuxProcessMetricsProcessor.process", - null,ee); - } - RandomAccessFile rStatmFile = null; - try { - rStatmFile = new RandomAccessFile("/proc/" - + pid + "/statm", "r"); - } catch (FileNotFoundException fnfe) { - logger.info( - "LinuxProcessMetricsProcessor.process", - null, - "Statm File:" - + "/proc/" - + pid - + "/statm *Not Found*. Process must have already exited"); - return; - } - ProcessResidentMemoryCollector collector = new ProcessResidentMemoryCollector( - rStatmFile, 2, 0); - // if process is stopping or already dead dont - // collect metrics. The Camel - // route has just been stopped. - if (!collectStats(process.getProcessState())) { - return; + killChildProcess(pid, "-15"); } + } + } catch (Exception ee) { + if (!collectStats(process.getProcessState())) { + return; + } + logger.error("process", null, ee); + } + return; + } + } - Future<ProcessResidentMemory> prm = pool - .submit(collector); - - totalRss += prm.get().get(); + } - rStatmFile.close(); - } + } - ProcessCpuUsageCollector processCpuUsageCollector = - new ProcessCpuUsageCollector(logger, agent.cgroupsManager, containerId); - processCpuUsage = pool - .submit(processCpuUsageCollector); - long cpuUsageInNanos = processCpuUsage.get().getCpuUsage(); - if ( cpuUsageInNanos >= 0 ) { - // cpuUsage comes from cpuacct.usage and is in nanos - totalCpuUsageInMillis = Math.round( cpuUsageInNanos / 1000000 ); // normalize into millis - } else { - totalCpuUsageInMillis = -1; - } - logger.info( - "LinuxProcessMetricsProcessor.process",null, - "CPU USAGE:"+cpuUsageInNanos+ " CLOCK RATE:"+agent.cpuClockRate+" Total CPU USAGE:"+totalCpuUsageInMillis); - } else { - if (swapUsageScript != null) { - DuccProcessSwapSpaceUsage processSwapSpaceUsage = new DuccProcessSwapSpaceUsage( - process.getPID(), - managedProcess.getOwner(), swapUsageScript, - logger); - totalSwapUsage = processSwapSpaceUsage - .getSwapUsage(); - } + private ProcessGarbageCollectionStats getGCStats() throws Exception { + if (!process.getProcessType().equals(ProcessType.Pop)) { + ProcessGarbageCollectionStats gcStats = gcStatsCollector + .collect(); + return gcStats; + } + return new ProcessGarbageCollectionStats(); + } + public boolean processIsActive() { + return process.getProcessState().equals(ProcessState.Starting) + || + process.getProcessState().equals(ProcessState.Initializing) + || + process.getProcessState().equals(ProcessState.Running); + } + public void process(Exchange e) { + // if process is stopping or already dead dont collect metrics. The + // Camel route has just been stopped. + if (closed || !processIsActive()) { + logger.info("LinuxProcessMetricsProcessor.process", null,"Process with PID:"+process.getPID() +" not in Running or Initializing state. Returning"); + return; + } + try { + + process.setSwapUsage(getSwapUsage()); + process.setMajorFaults(getFaults()); - ProcessMajorFaultCollector processMajorFaultUsageCollector = new ProcessMajorFaultCollector( - logger, process.getPID()); + long rssInBytes = getRss(); + process.setResidentMemory(rssInBytes); - // if process is stopping or already dead dont collect - // metrics. The Camel - // route has just been stopped. - if (!collectStats(process.getProcessState())) { - return; - } - processMajorFaultUsage = pool - .submit(processMajorFaultUsageCollector); - totalFaults = processMajorFaultUsage.get() - .getMajorFaults(); - // Cgroups are not available so percent CPU is not available - totalCpuUsageInMillis = -1; // -1 stands for N/A - //currentCpuUsage = -1; // -1 stands for N/A - - ProcessResidentMemoryCollector collector = new ProcessResidentMemoryCollector( - statmFile, 2, 0); - // if process is stopping or already dead dont collect - // metrics. The Camel - // route has just been stopped. - if (!collectStats(process.getProcessState())) { - return; - } + long totalCpuUsageInMillis = getCpuUsage(); - Future<ProcessResidentMemory> prm = pool - .submit(collector); - totalRss = prm.get().get(); - } + // set CPU time in terms of % + process.setCpuTime(getCpuTime(totalCpuUsageInMillis)); - } catch (Exception exc) { - if (!collectStats(process.getProcessState())) { - return; - } - logger.error("LinuxProcessMetricsProcessor.process", null, - exc); - } + process.setCurrentCPU(getCurrentCpu(totalCpuUsageInMillis)); - // report cpu utilization while the process is running - if (managedProcess.getDuccProcess().getProcessState() - .equals(ProcessState.Running) || - managedProcess.getDuccProcess().getProcessState() - .equals(ProcessState.Initializing) - ) { - if (agent.useCgroups && totalCpuUsageInMillis != -1) { - - long timeRunning = 1; - if ( process.getTimeWindowInit() != null ) { - timeRunning = process.getTimeWindowInit().getElapsedMillis(); - } - if ( process.getTimeWindowRun() != null ) { - timeRunning += process.getTimeWindowRun().getElapsedMillis(); - } - // normalize time in running state into seconds - percentCPU = Math.round(100*( (totalCpuUsageInMillis*1.0)/ (timeRunning*1.0))); - process.setCpuTime( percentCPU ); - } else { - process.setCpuTime(-1); // -1 stands for N/A - percentCPU = -1; - } - } else { - // if process is not dead, report the last known percentCPU - process.setCpuTime(percentCPU); - } - // publish current CPU usage by computing a delta from the last time - // CPU data was fetched. - if ( totalCpuUsageInMillis > 0 ) { - double millisCPU = ( totalCpuUsageInMillis - previousCPUReadingInMillis )*1.0; - double millisRun = ( System.currentTimeMillis() - previousSnapshotTime )*1.0; - process.setCurrentCPU(Math.round(100*(millisCPU/millisRun) ) ); - previousCPUReadingInMillis = totalCpuUsageInMillis; - previousSnapshotTime = System.currentTimeMillis(); - - } else { - if (agent.useCgroups && totalCpuUsageInMillis != -1 ) { - process.setCurrentCPU(0); - } else { - process.setCurrentCPU(-1); // -1 stands for N/A - } - } - logger.info( + ProcessGarbageCollectionStats gcStats = getGCStats(); + process.setGarbageCollectionStats(gcStats); + logger.info( "process", null, - "----------- PID:" + process.getPID() - + " Total CPU Time (%):" + process.getCpuTime() - + " Delta CPU Time (%):" +process.getCurrentCPU() ); - // collects process Major faults (swap in memory) - process.setMajorFaults(totalFaults); - // Current Process Swap Usage in bytes - long st = System.currentTimeMillis(); - long processSwapUsage = totalSwapUsage * 1024; - // collects swap usage from /proc/<PID>/smaps file via a script - // DUCC_HOME/admin/collect_process_swap_usage.sh - process.setSwapUsage(processSwapUsage); - logger.info( - "process", - null, - "----------- PID:" + process.getPID() - + " Major Faults:" + totalFaults - + " Process Swap Usage:" + processSwapUsage - + " Max Swap Usage Allowed:" - + managedProcess.getMaxSwapThreshold() - + " Time to Collect Swap Usage:" - + (System.currentTimeMillis() - st)); - if (processSwapUsage > 0 - && processSwapUsage > managedProcess - .getMaxSwapThreshold()) { - } else { - // Use Memory Guard only if cgroups are disabled and fudge - // factor > -1 - - if (!agent.useCgroups - && fudgeFactor > -1 - && managedProcess.getProcessMemoryAssignment() - .getMaxMemoryWithFudge() > 0) { - // RSS is in terms of pages(blocks) which size is system - // dependent. Default 4096 bytes - long rss = (totalRss * (blockSize / 1024)) / 1024; // normalize - // RSS - // into - // MB - logger.trace( - "process", - null, - "*** Process with PID:" - + managedProcess.getPid() - + " Assigned Memory (MB): " - + managedProcess - .getProcessMemoryAssignment() - + " MBs. Current RSS (MB):" + rss); - // check if process resident memory exceeds its memory - // assignment calculate in the PM - if (rss > managedProcess.getProcessMemoryAssignment() - .getMaxMemoryWithFudge()) { - logger.error( - "process", - null, - "\n\n********************************************************\n\tProcess with PID:" - + managedProcess.getPid() - + " Exceeded its max memory assignment (including a fudge factor) of " - + managedProcess - .getProcessMemoryAssignment() - .getMaxMemoryWithFudge() - + " MBs. This Process Resident Memory Size: " - + rss - + " MBs .Killing process ...\n********************************************************\n\n"); - try { - managedProcess.kill(); // mark it for death - process.setReasonForStoppingProcess(ReasonForStoppingProcess.ExceededShareSize - .toString()); - agent.stopProcess(process); - - if (agent.useCgroups) { - for (String pid : cgroupPids) { - // skip the main process that was just - // killed above. Only kill - // its child processes. - if (pid.equals(managedProcess - .getDuccProcess().getPID())) { - continue; - } - killChildProcess(pid, "-15"); - } - } - } catch (Exception ee) { - if (!collectStats(process.getProcessState())) { - return; - } - logger.error("process", null, ee); - } - return; - } - } - - } - // Publish resident memory - process.setResidentMemory((totalRss * blockSize)); - // dont collect GC metrics for POPs. May not be java or may not - // be a jmx enabled java process - if (!process.getProcessType().equals(ProcessType.Pop)) { - ProcessGarbageCollectionStats gcStats = gcStatsCollector - .collect(); - process.setGarbageCollectionStats(gcStats); - logger.info( - "process", - null, - "PID:" + process.getPID() - + " Total GC Collection Count :" - + gcStats.getCollectionCount() - + " Total GC Collection Time :" - + gcStats.getCollectionTime()); - } - - } catch (Exception ex) { - // if the child process is not running dont log the exception. - if (!collectStats(process.getProcessState())) { - return; - } - logger.error("process", null, ex); - ex.printStackTrace(); + "----------- PID:" + process.getPID() + " RSS:" + + ((rssInBytes > -1) ? (rssInBytes / (1024 * 1024))+ " MB" : "-1") + + " Total CPU Time (%):" + process.getCpuTime() + + " Delta CPU Time (%):" + process.getCurrentCPU() + + " Major Faults:" + process.getMajorFaults() + + " Process Swap Usage:" + process.getSwapUsage() + + " Max Swap Usage Allowed:" + + managedProcess.getMaxSwapThreshold() + + " Total GC Collection Count :" + + gcStats.getCollectionCount() + + " Total GC Collection Time :" + + gcStats.getCollectionTime()); + + killProcsIfExceedingMemoryThreshold(); + + } catch (Exception exc) { + if (!collectStats(process.getProcessState())) { + return; } - + logger.error("LinuxProcessMetricsProcessor.process", null, exc); + } } private void killChildProcess(final String pid, final String signal) { Modified: uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/memory/DuccProcessResidentMemory.java URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/memory/DuccProcessResidentMemory.java?rev=1778456&r1=1778455&r2=1778456&view=diff ============================================================================== --- uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/memory/DuccProcessResidentMemory.java (original) +++ uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/memory/DuccProcessResidentMemory.java Thu Jan 12 18:54:54 2017 @@ -18,25 +18,17 @@ */ package org.apache.uima.ducc.common.agent.metrics.memory; -import org.apache.uima.ducc.common.node.metrics.ByteBufferParser; - -public class DuccProcessResidentMemory extends ByteBufferParser implements +public class DuccProcessResidentMemory implements ProcessResidentMemory { private static final long serialVersionUID = 8563460863767404377L; - private static final int TOTAL = 0; - private static final int RESIDENT = 1; - - public DuccProcessResidentMemory(byte[] memInfoBuffer, - int[] memInfoFieldOffsets, int[] memInfoFiledLengths) { - super(memInfoBuffer, memInfoFieldOffsets, memInfoFiledLengths); + long rss; + + public DuccProcessResidentMemory(long rss) { + this.rss = rss; } public long get() { - return super.getFieldAsLong(RESIDENT); - } - - public long getTotal() { - return super.getFieldAsLong(TOTAL); + return rss; } } Modified: uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessMemoryPageLoadUsage.java URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessMemoryPageLoadUsage.java?rev=1778456&r1=1778455&r2=1778456&view=diff ============================================================================== --- uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessMemoryPageLoadUsage.java (original) +++ uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessMemoryPageLoadUsage.java Thu Jan 12 18:54:54 2017 @@ -18,57 +18,13 @@ */ package org.apache.uima.ducc.common.agent.metrics.swap; -import java.io.BufferedReader; -import java.io.InputStream; -import java.io.InputStreamReader; - -public class DuccProcessMemoryPageLoadUsage implements - ProcessMemoryPageLoadUsage { - String pid; +public class DuccProcessMemoryPageLoadUsage implements ProcessMemoryPageLoadUsage{ + long faults; - public DuccProcessMemoryPageLoadUsage(String pid) { - this.pid = pid; + public DuccProcessMemoryPageLoadUsage(long faults) { + this.faults = faults; } - public long getMajorFaults() throws Exception { - return collectProcessMajorFaults(); - } - private long collectProcessMajorFaults() throws Exception { - String[] command = new String[] {"/bin/ps","-o","maj_flt",pid}; - - ProcessBuilder builder = new ProcessBuilder(command); - builder.redirectErrorStream(true); - Process process = builder.start(); - InputStream is = process.getInputStream(); - if ( is != null ) { - InputStreamReader isr = new InputStreamReader(is); - BufferedReader br = new BufferedReader(isr); - String line; - int count = 0; - String faults = null; - try { - while ((line = br.readLine()) != null) { - // skip the header line - if (count == 1) { - faults = line.trim(); - } - count++; - } - } finally { - if (is != null) { - is.close(); - } - process.waitFor(); - process.destroy(); - } - if ( faults != null) { - return Long.parseLong(faults.trim()); - } else { - return 0; - } - - } - return 0; - + public long getMajorFaults() { + return faults; } - } Modified: uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessSwapSpaceUsage.java URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessSwapSpaceUsage.java?rev=1778456&r1=1778455&r2=1778456&view=diff ============================================================================== --- uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessSwapSpaceUsage.java (original) +++ uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessSwapSpaceUsage.java Thu Jan 12 18:54:54 2017 @@ -19,72 +19,13 @@ package org.apache.uima.ducc.common.agent.metrics.swap; -import java.io.BufferedReader; -import java.io.InputStreamReader; - - -import org.apache.uima.ducc.common.utils.DuccLogger; -import org.apache.uima.ducc.common.utils.Utils; - public class DuccProcessSwapSpaceUsage implements ProcessSwapSpaceUsage { - String pid=null; - String execScript=null; - DuccLogger logger=null; - String[] command; + long swapusage; - public DuccProcessSwapSpaceUsage( String pid, String owner, String execScript, DuccLogger logger) { - this.pid = pid; - this.execScript = execScript; - this.logger = logger; - String c_launcher_path = - Utils.resolvePlaceholderIfExists( - System.getProperty("ducc.agent.launcher.ducc_spawn_path"),System.getProperties()); - command = new String[] { c_launcher_path, - "-u", owner, "--", execScript, pid }; + public DuccProcessSwapSpaceUsage( long swapusage) { + this.swapusage = swapusage; } public long getSwapUsage() { - long swapusage=0; - if ( pid != null && execScript != null ) { - InputStreamReader in = null; - try { - ProcessBuilder pb = new ProcessBuilder(); - //String[] command = {execScript,pid}; - pb.command(command); //command); - - //logger.info("------------ getSwapUsage-", null, cmd); - pb.redirectErrorStream(true); - Process swapCollectorProcess = pb.start(); - in = new InputStreamReader(swapCollectorProcess.getInputStream()); - BufferedReader reader = new BufferedReader(in); - String line=null; - boolean skip = true; - while ((line = reader.readLine()) != null) { - try { - if ( line.startsWith("1001")) { - skip = false; - continue; - } - if (!skip) { - swapusage = Long.parseLong(line.trim()); - logger.info("getSwapUsage-",null, "PID:"+pid+" Swap Usage:"+line); - } - } catch( NumberFormatException e) { - logger.error("getSwapUsage", null, line); - } - } - } catch( Exception e) { - logger.error("getSwapUsage", null, e); - } finally { - if ( in != null ) { - try { - in.close(); - } catch( Exception e) { - logger.error("getSwapUsage", null, e); - } - - } - } - } return swapusage; } Modified: uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/ProcessMemoryPageLoadUsage.java URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/ProcessMemoryPageLoadUsage.java?rev=1778456&r1=1778455&r2=1778456&view=diff ============================================================================== --- uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/ProcessMemoryPageLoadUsage.java (original) +++ uima/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/ProcessMemoryPageLoadUsage.java Thu Jan 12 18:54:54 2017 @@ -20,5 +20,5 @@ package org.apache.uima.ducc.common.agent.metrics.swap; public interface ProcessMemoryPageLoadUsage { - public long getMajorFaults() throws Exception; + public long getMajorFaults(); }