Author: rding Date: Mon Sep 27 19:54:37 2010 New Revision: 1001892 URL: http://svn.apache.org/viewvc?rev=1001892&view=rev Log: PIG-1641: Incorrect counters in local mode
Modified: hadoop/pig/branches/branch-0.8/CHANGES.txt hadoop/pig/branches/branch-0.8/src/org/apache/pig/tools/pigstats/InputStats.java hadoop/pig/branches/branch-0.8/src/org/apache/pig/tools/pigstats/JobStats.java hadoop/pig/branches/branch-0.8/src/org/apache/pig/tools/pigstats/OutputStats.java hadoop/pig/branches/branch-0.8/src/org/apache/pig/tools/pigstats/PigStats.java Modified: hadoop/pig/branches/branch-0.8/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/CHANGES.txt?rev=1001892&r1=1001891&r2=1001892&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.8/CHANGES.txt (original) +++ hadoop/pig/branches/branch-0.8/CHANGES.txt Mon Sep 27 19:54:37 2010 @@ -198,6 +198,8 @@ PIG-1309: Map-side Cogroup (ashutoshc) BUG FIXES +PIG-1641: Incorrect counters in local mode (rding) + PIG-1647: Logical simplifier throws a NPE (yanz) PIG-1642: Order by doesn't use estimation to determine the parallelism (rding) Modified: hadoop/pig/branches/branch-0.8/src/org/apache/pig/tools/pigstats/InputStats.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/tools/pigstats/InputStats.java?rev=1001892&r1=1001891&r2=1001892&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.8/src/org/apache/pig/tools/pigstats/InputStats.java (original) +++ hadoop/pig/branches/branch-0.8/src/org/apache/pig/tools/pigstats/InputStats.java Mon Sep 27 19:54:37 2010 @@ -84,9 +84,9 @@ public final class InputStats { return type; } - String getDisplayString() { + String getDisplayString(boolean local) { StringBuilder sb = new StringBuilder(); - if (success) { + if (success) { sb.append("Successfully "); if (type == INPUT_TYPE.sampler) { sb.append("sampled "); @@ -95,7 +95,12 @@ public final class InputStats { } else { sb.append("read "); } - sb.append(records).append(" records "); + + if (!local) { + sb.append(records).append(" records "); + } else { + sb.append("records "); + } if (bytes > 0) { sb.append("(").append(bytes).append(" bytes) "); } Modified: hadoop/pig/branches/branch-0.8/src/org/apache/pig/tools/pigstats/JobStats.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/tools/pigstats/JobStats.java?rev=1001892&r1=1001891&r2=1001892&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.8/src/org/apache/pig/tools/pigstats/JobStats.java (original) +++ hadoop/pig/branches/branch-0.8/src/org/apache/pig/tools/pigstats/JobStats.java Mon Sep 27 19:54:37 2010 @@ -64,7 +64,16 @@ public final class JobStats extends Oper public static final String ALIAS = "JobStatistics:alias"; public static final String FEATURE = "JobStatistics:feature"; + + public static final String SUCCESS_HEADER = "JobId\tMaps\tReduces\t" + + "MaxMapTime\tMinMapTIme\tAvgMapTime\tMaxReduceTime\t" + + "MinReduceTime\tAvgReduceTime\tAlias\tFeature\tOutputs"; + public static final String FAILURE_HEADER = "JobId\tAlias\tFeature\tMessage\tOutputs"; + + // currently counters are not working in local mode - see PIG-1286 + public static final String SUCCESS_HEADER_LOCAL = "JobId\tAlias\tFeature\tOutputs"; + private static final Log LOG = LogFactory.getLog(JobStats.class); public static enum JobState { UNKNOWN, SUCCESS, FAILED; } @@ -280,14 +289,16 @@ public final class JobStats extends Oper avgReduceTime = avg; } - String getDisplayString() { + String getDisplayString(boolean local) { StringBuilder sb = new StringBuilder(); String id = (jobId == null) ? "N/A" : jobId.toString(); - if (state == JobState.FAILED) { + if (state == JobState.FAILED || local) { sb.append(id).append("\t") .append(getAlias()).append("\t") - .append(getFeature()).append("\t") - .append("Message: ").append(getErrorMessage()).append("\t"); + .append(getFeature()).append("\t"); + if (state == JobState.FAILED) { + sb.append("Message: ").append(getErrorMessage()).append("\t"); + } } else if (state == JobState.SUCCESS) { sb.append(id).append("\t") .append(numberMaps).append("\t") Modified: hadoop/pig/branches/branch-0.8/src/org/apache/pig/tools/pigstats/OutputStats.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/tools/pigstats/OutputStats.java?rev=1001892&r1=1001891&r2=1001892&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.8/src/org/apache/pig/tools/pigstats/OutputStats.java (original) +++ hadoop/pig/branches/branch-0.8/src/org/apache/pig/tools/pigstats/OutputStats.java Mon Sep 27 19:54:37 2010 @@ -92,11 +92,15 @@ public final class OutputStats { return conf; } - String getDisplayString() { + String getDisplayString(boolean local) { StringBuilder sb = new StringBuilder(); if (success) { - sb.append("Successfully stored ").append(records).append( - " records "); + sb.append("Successfully stored "); + if (!local) { + sb.append(records).append(" records "); + } else { + sb.append("records "); + } if (bytes > 0) { sb.append("(").append(bytes).append(" bytes) "); } Modified: hadoop/pig/branches/branch-0.8/src/org/apache/pig/tools/pigstats/PigStats.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/tools/pigstats/PigStats.java?rev=1001892&r1=1001891&r2=1001892&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.8/src/org/apache/pig/tools/pigstats/PigStats.java (original) +++ hadoop/pig/branches/branch-0.8/src/org/apache/pig/tools/pigstats/PigStats.java Mon Sep 27 19:54:37 2010 @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobID; import org.apache.hadoop.mapred.jobcontrol.Job; +import org.apache.pig.ExecType; import org.apache.pig.PigException; import org.apache.pig.PigRunner.ReturnCode; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler; @@ -559,6 +560,17 @@ public final class PigStats { LOG.warn("unknown return code, can't display the results"); return; } + if (pigContext == null) { + LOG.warn("unknown exec type, don't display the results"); + return; + } + + // currently counters are not working in local mode - see PIG-1286 + ExecType execType = pigContext.getExecType(); + if (execType == ExecType.LOCAL) { + LOG.info("Detected Local mode. Stats reported below may be incomplete"); + } + SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT); StringBuilder sb = new StringBuilder(); sb.append("\nHadoopVersion\tPigVersion\tUserId\tStartedAt\tFinishedAt\tFeatures\n"); @@ -576,47 +588,52 @@ public final class PigStats { sb.append("Failed!\n"); } sb.append("\n"); + if (returnCode == ReturnCode.SUCCESS - || returnCode == ReturnCode.PARTIAL_FAILURE) { + || returnCode == ReturnCode.PARTIAL_FAILURE) { sb.append("Job Stats (time in seconds):\n"); - sb.append("JobId\tMaps\tReduces\tMaxMapTime\tMinMapTIme\t" + - "AvgMapTime\tMaxReduceTime\tMinReduceTime\tAvgReduceTime\t" + - "Alias\tFeature\tOutputs\n"); + if (execType == ExecType.LOCAL) { + sb.append(JobStats.SUCCESS_HEADER_LOCAL).append("\n"); + } else { + sb.append(JobStats.SUCCESS_HEADER).append("\n"); + } List<JobStats> arr = jobPlan.getSuccessfulJobs(); for (JobStats js : arr) { - sb.append(js.getDisplayString()); + sb.append(js.getDisplayString(execType == ExecType.LOCAL)); } sb.append("\n"); } if (returnCode == ReturnCode.FAILURE || returnCode == ReturnCode.PARTIAL_FAILURE) { sb.append("Failed Jobs:\n"); - sb.append("JobId\tAlias\tFeature\tMessage\tOutputs\n"); + sb.append(JobStats.FAILURE_HEADER).append("\n"); List<JobStats> arr = jobPlan.getFailedJobs(); for (JobStats js : arr) { - sb.append(js.getDisplayString()); + sb.append(js.getDisplayString(execType == ExecType.LOCAL)); } sb.append("\n"); } sb.append("Input(s):\n"); for (InputStats is : getInputStats()) { - sb.append(is.getDisplayString()); + sb.append(is.getDisplayString(execType == ExecType.LOCAL)); } sb.append("\n"); sb.append("Output(s):\n"); for (OutputStats ds : getOutputStats()) { - sb.append(ds.getDisplayString()); + sb.append(ds.getDisplayString(execType == ExecType.LOCAL)); } - sb.append("\nCounters:\n"); - sb.append("Total records written : " + getRecordWritten()).append("\n"); - sb.append("Total bytes written : " + getBytesWritten()).append("\n"); - sb.append("Spillable Memory Manager spill count : " - + getSMMSpillCount()).append("\n"); - sb.append("Total bags proactively spilled: " - + getProactiveSpillCountObjects()).append("\n"); - sb.append("Total records proactively spilled: " - + getProactiveSpillCountRecords()).append("\n"); + if (execType != ExecType.LOCAL) { + sb.append("\nCounters:\n"); + sb.append("Total records written : " + getRecordWritten()).append("\n"); + sb.append("Total bytes written : " + getBytesWritten()).append("\n"); + sb.append("Spillable Memory Manager spill count : " + + getSMMSpillCount()).append("\n"); + sb.append("Total bags proactively spilled: " + + getProactiveSpillCountObjects()).append("\n"); + sb.append("Total records proactively spilled: " + + getProactiveSpillCountRecords()).append("\n"); + } sb.append("\nJob DAG:\n").append(jobPlan.toString());