Author: olga Date: Thu Sep 18 13:46:59 2008 New Revision: 696795 URL: http://svn.apache.org/viewvc?rev=696795&view=rev Log: PIG-253: integration with Hadoop 18
Added: incubator/pig/trunk/lib/hadoop18.jar (with props) Modified: incubator/pig/trunk/build.xml incubator/pig/trunk/src/org/apache/pig/ComparisonFunc.java incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HFile.java incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SliceWrapper.java incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java Modified: incubator/pig/trunk/build.xml URL: http://svn.apache.org/viewvc/incubator/pig/trunk/build.xml?rev=696795&r1=696794&r2=696795&view=diff ============================================================================== --- incubator/pig/trunk/build.xml (original) +++ incubator/pig/trunk/build.xml Thu Sep 18 13:46:59 2008 @@ -58,7 +58,7 @@ <property name="build.javadoc" value="${build.docs}/api" /> <property name="build.encoding" value="ISO-8859-1" /> <!-- TODO with only one version of hadoop in the lib folder we do not need that anymore --> - <property name="hadoop.jarfile" value="hadoop17.jar" /> + <property name="hadoop.jarfile" value="hadoop18.jar" /> <!-- distribution properties --> <property name="staging.dir" value="${build.dir}/staging"/> @@ -85,7 +85,7 @@ <property name="junit.hadoop.conf" value="" /> <property name="test.log.dir" value="${basedir}/test/logs"/> <property name="junit.hadoop.conf" value="${user.home}/pigtest/conf/"/> - <property name="test.output" value="no"/> + <property name="test.output" value="yes"/> <!-- javadoc properties --> <property name="javadoc.link.java" value="http://java.sun.com/j2se/1.5.0/docs/api/" /> Added: incubator/pig/trunk/lib/hadoop18.jar URL: http://svn.apache.org/viewvc/incubator/pig/trunk/lib/hadoop18.jar?rev=696795&view=auto ============================================================================== Binary file - no diff available. Propchange: incubator/pig/trunk/lib/hadoop18.jar ------------------------------------------------------------------------------ svn:mime-type = application/octet-stream Modified: incubator/pig/trunk/src/org/apache/pig/ComparisonFunc.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/ComparisonFunc.java?rev=696795&r1=696794&r2=696795&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/ComparisonFunc.java (original) +++ incubator/pig/trunk/src/org/apache/pig/ComparisonFunc.java Thu Sep 18 13:46:59 2008 @@ -26,7 +26,7 @@ public abstract class ComparisonFunc extends WritableComparator { public ComparisonFunc() { - super(Tuple.class); + super(Tuple.class, true); } public int compare(WritableComparable a, WritableComparable b) { Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HFile.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HFile.java?rev=696795&r1=696794&r2=696795&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HFile.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HFile.java Thu Sep 18 13:46:59 2008 @@ -75,6 +75,6 @@ public SeekableInputStream sopen() throws IOException { return new HSeekableInputStream(fs.getHFS().open(path), - fs.getHFS().getContentLength(path)); + fs.getHFS(). getContentSummary(path).getLength()); } } Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=696795&r1=696794&r2=696795&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Thu Sep 18 13:46:59 2008 @@ -45,7 +45,6 @@ import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobSubmissionProtocol; import org.apache.hadoop.mapred.JobTracker; import org.apache.pig.backend.datastorage.DataStorage; import org.apache.pig.backend.executionengine.ExecException; @@ -80,7 +79,6 @@ protected DataStorage ds; - protected JobSubmissionProtocol jobTracker; protected JobClient jobClient; // key: the operator key from the logical plan that originated the physical plan @@ -101,7 +99,6 @@ this.ds = null; // to be set in the init method - this.jobTracker = null; this.jobClient = null; } @@ -185,16 +182,6 @@ if(cluster != null && !cluster.equalsIgnoreCase(LOCAL)){ log.info("Connecting to map-reduce job tracker at: " + properties.get(JOB_TRACKER_LOCATION)); - if (!LOCAL.equalsIgnoreCase(cluster)) { - try { - jobTracker = (JobSubmissionProtocol) RPC.getProxy( - JobSubmissionProtocol.class, - JobSubmissionProtocol.versionID, JobTracker - .getAddress(configuration), configuration); - } catch (IOException e) { - throw new ExecException("Failed to crate job tracker", e); - } - } } try { Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java?rev=696795&r1=696794&r2=696795&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java Thu Sep 18 13:46:59 2008 @@ -175,6 +175,9 @@ } if (pom.toCombine != null) { conf.set("pig.combineFunc", ObjectSerializer.serialize(pom.toCombine)); + // this is to make sure that combiner is only called once + // since we can't handle no combine or multiple combines + conf.setCombineOnceOnly(true); } if (pom.groupFuncs != null) { conf.set("pig.groupFuncs", ObjectSerializer.serialize(pom.groupFuncs)); Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java?rev=696795&r1=696794&r2=696795&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java Thu Sep 18 13:46:59 2008 @@ -70,7 +70,10 @@ } } - index = PigInputFormat.getActiveSplit().getIndex(); + if (PigInputFormat.getActiveSplit() == null) { + } else { + index = PigInputFormat.getActiveSplit().getIndex(); + } Datum groupName = key.getField(0); finalout.group = key; Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SliceWrapper.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SliceWrapper.java?rev=696795&r1=696794&r2=696795&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SliceWrapper.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SliceWrapper.java Thu Sep 18 13:46:59 2008 @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; @@ -90,11 +91,12 @@ Set<String> locations = new HashSet<String>(); for (String loc : wrapped.getLocations()) { Path path = new Path(loc); - String hints[][] = fs.getFileCacheHints(path, 0, fs.getFileStatus( - path).getLen()); - for (int i = 0; i < hints.length; i++) { - for (int j = 0; j < hints[i].length; j++) { - locations.add(hints[i][j]); + BlockLocation[] blocks = fs.getFileBlockLocations(path, 0, fs.getFileStatus( + path).getLen()); + for (int i = 0; i < blocks.length; i++) { + String[] hosts = blocks[i].getHosts(); + for (int j = 0; j < hosts.length; j++){ + locations.add(hosts[j]); } } } Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java?rev=696795&r1=696794&r2=696795&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java Thu Sep 18 13:46:59 2008 @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.TaskAttemptID; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.PigMapReduce; import org.apache.pig.impl.eval.collector.DataCollector; @@ -169,10 +170,7 @@ */ private boolean writeErrorToHDFS(int limit, String taskId) { if (command.getPersistStderr()) { - // These are hard-coded begin/end offsets a Hadoop *taskid* - int beginIndex = 25, endIndex = 31; - - int tipId = Integer.parseInt(taskId.substring(beginIndex, endIndex)); + int tipId = TaskAttemptID.forName(taskId).getTaskID().getId(); return tipId < command.getLogFilesLimit(); } return false; @@ -249,4 +247,4 @@ } } - \ No newline at end of file +