Author: hashutosh Date: Thu Jan 28 19:57:21 2010 New Revision: 904241 URL: http://svn.apache.org/viewvc?rev=904241&view=rev Log: (empty)
Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java hadoop/pig/trunk/test/org/apache/pig/test/TestLocalRearrange.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=904241&r1=904240&r2=904241&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Thu Jan 28 19:57:21 2010 @@ -78,6 +78,8 @@ BUG FIXES +PIG-1194: ERROR 2055: Received Error while processing the map plan (rding via ashutoshc) + PIG-1204: Pig hangs when joining two streaming relations in local mode (rding) Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java?rev=904241&r1=904240&r2=904241&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java Thu Jan 28 19:57:21 2010 @@ -57,7 +57,9 @@ protected static final TupleFactory mTupleFactory = TupleFactory.getInstance(); - transient private Log log = LogFactory.getLog(getClass()); + private static Log log = LogFactory.getLog(POLocalRearrange.class); + + private static final Result ERR_RESULT = new Result(); protected List<PhysicalPlan> plans; @@ -251,7 +253,7 @@ public Result getNext(Tuple t) throws ExecException { Result inp = null; - Result res = null; + Result res = ERR_RESULT; while (true) { inp = processInput(); if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR) @@ -308,9 +310,16 @@ case DataType.TUPLE: res = op.getNext(dummyTuple); break; + default: + log.error("Invalid result type: " + DataType.findType(op.getResultType())); + break; } - if(res.returnStatus!=POStatus.STATUS_OK) + + // allow null as group by key + if (res.returnStatus != POStatus.STATUS_OK && res.returnStatus != POStatus.STATUS_NULL) { return new Result(); + } + resLst.add(res); } @@ -349,15 +358,24 @@ case DataType.TUPLE: res = op.getNext(dummyTuple); break; + default: + log.error("Invalid result type: " + DataType.findType(op.getResultType())); + break; } - if(res.returnStatus!=POStatus.STATUS_OK) + + // allow null as group by key + if (res.returnStatus != POStatus.STATUS_OK && res.returnStatus != POStatus.STATUS_NULL) { return new Result(); + } + secondaryResLst.add(res); } } + // If we are using secondary sort key, our new key is: - // (nullable, index, (key, secondary key), value) - res.result = constructLROutput(resLst,secondaryResLst,(Tuple)inp.result); + // (nullable, index, (key, secondary key), value) + res.result = constructLROutput(resLst,secondaryResLst,(Tuple)inp.result); + res.returnStatus = POStatus.STATUS_OK; return res; } Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestLocalRearrange.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestLocalRearrange.java?rev=904241&r1=904240&r2=904241&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestLocalRearrange.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestLocalRearrange.java Thu Jan 28 19:57:21 2010 @@ -17,11 +17,21 @@ */ package org.apache.pig.test; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.Random; +import junit.framework.Assert; + +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.executionengine.ExecJob; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataType; import org.apache.pig.data.DefaultTuple; @@ -173,4 +183,52 @@ assertEquals(db.size(), size); } + @Test + public void testMultiQueryJiraPig1194() { + + // test case: POLocalRearrange doesn't handle nulls returned by POBinCond + + String INPUT_FILE = "data.txt"; + + final MiniCluster cluster = MiniCluster.buildCluster(); + + try { + PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE)); + w.println("10\t2\t3"); + w.println("20\t3\t"); + w.close(); + Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE); + + PigServer myPig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); + + myPig.registerQuery("data = load '" + INPUT_FILE + "' as (a0, a1, a2);"); + myPig.registerQuery("grp = GROUP data BY (((double) a2)/((double) a1) > .001 OR a0 < 11 ? a0 : -1);"); + + List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings( + new String[] { + "(10,{(10,2,3)})", + "(null,{(20,3,null)})" + }); + + Iterator<Tuple> iter = myPig.openIterator("grp"); + int counter = 0; + while (iter.hasNext()) { + assertEquals(expectedResults.get(counter++).toString(), iter.next().toString()); + } + assertEquals(expectedResults.size(), counter); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } finally { + new File(INPUT_FILE).delete(); + try { + Util.deleteFile(cluster, INPUT_FILE); + } catch (IOException e) { + e.printStackTrace(); + Assert.fail(); + } + } + } + }