Author: olga Date: Wed Sep 24 17:19:57 2008 New Revision: 698783 URL: http://svn.apache.org/viewvc?rev=698783&view=rev Log: PIG-458: move to hadoop 18
Added: incubator/pig/branches/types/lib/hadoop18.jar (with props) Modified: incubator/pig/branches/types/CHANGES.txt incubator/pig/branches/types/build.xml incubator/pig/branches/types/src/org/apache/pig/ComparisonFunc.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java Modified: incubator/pig/branches/types/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/CHANGES.txt?rev=698783&r1=698782&r2=698783&view=diff ============================================================================== --- incubator/pig/branches/types/CHANGES.txt (original) +++ incubator/pig/branches/types/CHANGES.txt Wed Sep 24 17:19:57 2008 @@ -247,3 +247,5 @@ PIG-455: "group" alias is lost after a flatten(group) (pradeepk vi olgan) + PIG-458: integration with Hadoop 19 (olgan) + Modified: incubator/pig/branches/types/build.xml URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/build.xml?rev=698783&r1=698782&r2=698783&view=diff ============================================================================== --- incubator/pig/branches/types/build.xml (original) +++ incubator/pig/branches/types/build.xml Wed Sep 24 17:19:57 2008 @@ -27,7 +27,7 @@ <property name="dist.dir" value="${build.dir}/${final.name}" /> <property name="build.encoding" value="ISO-8859-1" /> <!-- TODO with only one version of hadoop in the lib folder we do not need that anymore --> - <property name="hadoop.jarfile" value="hadoop17.jar" /> + <property name="hadoop.jarfile" value="hadoop18.jar" /> <!-- javac properties --> <property name="javac.debug" value="on" /> Added: incubator/pig/branches/types/lib/hadoop18.jar URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/lib/hadoop18.jar?rev=698783&view=auto ============================================================================== Binary file - no diff available. Propchange: incubator/pig/branches/types/lib/hadoop18.jar ------------------------------------------------------------------------------ svn:mime-type = application/octet-stream Modified: incubator/pig/branches/types/src/org/apache/pig/ComparisonFunc.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/ComparisonFunc.java?rev=698783&r1=698782&r2=698783&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/ComparisonFunc.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/ComparisonFunc.java Wed Sep 24 17:19:57 2008 @@ -33,7 +33,7 @@ protected PigProgressable reporter; public ComparisonFunc() { - super(NullableTuple.class); + super(NullableTuple.class, true); } public int compare(WritableComparable a, WritableComparable b) { Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=698783&r1=698782&r2=698783&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Wed Sep 24 17:19:57 2008 @@ -46,7 +46,6 @@ import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobSubmissionProtocol; import org.apache.hadoop.mapred.JobTracker; import org.apache.pig.FuncSpec; import org.apache.pig.backend.datastorage.DataStorage; @@ -89,7 +88,6 @@ protected DataStorage ds; - protected JobSubmissionProtocol jobTracker; protected JobClient jobClient; // key: the operator key from the logical plan that originated the physical plan @@ -110,7 +108,6 @@ this.ds = null; // to be set in the init method - this.jobTracker = null; this.jobClient = null; } @@ -194,16 +191,6 @@ if(cluster != null && !cluster.equalsIgnoreCase(LOCAL)){ log.info("Connecting to map-reduce job tracker at: " + properties.get(JOB_TRACKER_LOCATION)); - if (!LOCAL.equalsIgnoreCase(cluster)) { - try { - jobTracker = (JobSubmissionProtocol) RPC.getProxy( - JobSubmissionProtocol.class, - JobSubmissionProtocol.versionID, JobTracker - .getAddress(configuration), configuration); - } catch (IOException e) { - throw new ExecException("Failed to crate job tracker", e); - } - } } try { Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=698783&r1=698782&r2=698783&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Wed Sep 24 17:19:57 2008 @@ -332,6 +332,7 @@ jobConf.setCombinerClass(PigCombiner.Combine.class); jobConf.set("pig.combinePlan", ObjectSerializer.serialize(mro.combinePlan)); jobConf.set("pig.combine.package", ObjectSerializer.serialize(combPack)); + jobConf.setCombineOnceOnly(true); } else if (mro.needsDistinctCombiner()) { jobConf.setCombinerClass(DistinctCombiner.Combine.class); log.info("Setting identity combiner class."); Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java?rev=698783&r1=698782&r2=698783&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java Wed Sep 24 17:19:57 2008 @@ -8,6 +8,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobID; import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapred.TaskReport; import org.apache.hadoop.mapred.jobcontrol.Job; @@ -85,7 +86,7 @@ } protected void getStats(Job job, JobClient jobClient) throws IOException{ - String MRJobID = job.getMapredJobID(); + JobID MRJobID = job.getAssignedJobID(); TaskReport[] mapRep = jobClient.getMapTaskReports(MRJobID); getErrorMessages(mapRep, "map"); totalHadoopTimeSpent += computeTimeSpent(mapRep); @@ -108,7 +109,7 @@ String msgs[] = reports[i].getDiagnostics(); for (int j = 0; j < msgs.length; j++) { log.error("Error message from task (" + type + ") " + - reports[i].getTaskId() + msgs[j]); + reports[i].getTaskID() + msgs[j]); } } } @@ -144,7 +145,7 @@ * @throws IOException */ protected double progressOfRunningJob(Job j, JobClient jobClient) throws IOException{ - String mrJobID = j.getMapredJobID(); + JobID mrJobID = j.getAssignedJobID(); RunningJob rj = jobClient.getJob(mrJobID); if(rj==null && j.getState()==Job.SUCCESS) return 1; Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java?rev=698783&r1=698782&r2=698783&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java Wed Sep 24 17:19:57 2008 @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.mapred.InputSplit; import org.apache.pig.FuncSpec; import org.apache.pig.LoadFunc; @@ -120,7 +121,8 @@ } public String[] getLocations() throws IOException { - BlockLocation[] b = fs.getFileBlockLocations(file, start, length); + FileStatus status = fs.getFileStatus(file); + BlockLocation[] b = fs.getFileBlockLocations(status, start, length); int total = 0; for (int i = 0; i < b.length; i++) { total += b[i].getHosts().length; Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java?rev=698783&r1=698782&r2=698783&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java Wed Sep 24 17:19:57 2008 @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; @@ -86,8 +87,8 @@ Set<String> locations = new HashSet<String>(); for (String loc : wrapped.getLocations()) { Path path = new Path(loc); - - BlockLocation[] b = fs.getFileBlockLocations(path, 0, fs.getFileStatus(path).getLen()); + FileStatus status = fs.getFileStatus(path); + BlockLocation[] b = fs.getFileBlockLocations(status, 0, status.getLen()); int total = 0; for (int i = 0; i < b.length; i++) { total += b[i].getHosts().length; Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java?rev=698783&r1=698782&r2=698783&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java Wed Sep 24 17:19:57 2008 @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.TaskAttemptID; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream; @@ -169,7 +170,8 @@ // These are hard-coded begin/end offsets a Hadoop *taskid* int beginIndex = 25, endIndex = 31; - int tipId = Integer.parseInt(taskId.substring(beginIndex, endIndex)); + //int tipId = Integer.parseInt(taskId.substring(beginIndex, endIndex)); + int tipId = TaskAttemptID.forName(taskId).getTaskID().getId(); return tipId < command.getLogFilesLimit(); } return false; @@ -246,4 +248,4 @@ } } - \ No newline at end of file + Modified: incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java?rev=698783&r1=698782&r2=698783&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java Wed Sep 24 17:19:57 2008 @@ -33,6 +33,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.mapred.JobID; import org.apache.pig.FuncSpec; import org.apache.pig.PigServer; import org.apache.pig.backend.datastorage.ContainerDescriptor; @@ -276,7 +277,8 @@ protected void processKill(String jobid) throws IOException { if (mJobClient != null) { - RunningJob job = mJobClient.getJob(jobid); + JobID id = JobID.forName(jobid); + RunningJob job = mJobClient.getJob(id); if (job == null) System.out.println("Job with id " + jobid + " is not active"); else