Author: rding Date: Fri Jul 30 16:55:11 2010 New Revision: 980885 URL: http://svn.apache.org/viewvc?rev=980885&view=rev Log: PIG-1513: Pig doesn't handle empty input directory
Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=980885&r1=980884&r2=980885&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Fri Jul 30 16:55:11 2010 @@ -110,6 +110,8 @@ PIG-1309: Map-side Cogroup (ashutoshc) BUG FIXES +PIG-1513: Pig doesn't handle empty input directory (rding) + PIG-1500: guava.jar should be removed from the lib folder (niraj via rding) PIG-1034: Pig does not support ORDER ... BY group alias (zjffdu) Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java?rev=980885&r1=980884&r2=980885&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java Fri Jul 30 16:55:11 2010 @@ -76,6 +76,9 @@ public class SkewedPartitioner extends P keyTuple = (Tuple)key.getValueAsPigType(); } + // if the partition file is empty, use numPartitions + totalReducers = (totalReducers > 0) ? totalReducers : numPartitions; + indexes = reducerMap.get(keyTuple); // if the reducerMap does not contain the key, do the default hash based partitioning if (indexes == null) { @@ -109,7 +112,8 @@ public class SkewedPartitioner extends P Integer [] redCnt = new Integer[1]; reducerMap = MapRedUtil.loadPartitionFileFromLocalCache( keyDistFile, redCnt, DataType.TUPLE); - totalReducers = redCnt[0]; + // check if the partition file is empty + totalReducers = (redCnt[0] == null) ? -1 : redCnt[0]; } catch (Exception e) { throw new RuntimeException(e); } Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java?rev=980885&r1=980884&r2=980885&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java Fri Jul 30 16:55:11 2010 @@ -92,7 +92,8 @@ public class POPartitionRearrange extend reducerMap = MapRedUtil.loadPartitionFileFromLocalCache( keyDistFile, redCnt, DataType.NULL); - totalReducers = redCnt[0]; + // check if the partition file is empty + totalReducers = (redCnt[0] == null) ? -1 : redCnt[0]; loaded = true; } catch (Exception e) { throw new RuntimeException(e); Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=980885&r1=980884&r2=980885&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java Fri Jul 30 16:55:11 2010 @@ -90,8 +90,10 @@ public class MapRedUtil { keyDistFile, 0); DataBag partitionList; Tuple t = loader.getNext(); - if(t==null) { - throw new RuntimeException("Empty samples file"); + if (t == null) { + // this could happen if the input directory for sampling is empty + log.warn("Empty dist file: " + keyDistFile); + return reducerMap; } // The keydist file is structured as (key, min, max) // min, max being the index of the reducers Modified: hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java?rev=980885&r1=980884&r2=980885&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java Fri Jul 30 16:55:11 2010 @@ -23,6 +23,8 @@ import java.util.LinkedList; import java.util.List; import java.util.Properties; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; @@ -40,7 +42,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; -import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.PigContext; @@ -50,6 +51,8 @@ import org.apache.pig.impl.plan.NodeIdGe import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.util.ObjectSerializer; + + /** * Used by MergeJoin . Takes an index on sorted data * consisting of sorted tuples of the form @@ -59,6 +62,7 @@ import org.apache.pig.impl.util.ObjectSe */ public class DefaultIndexableLoader extends LoadFunc implements IndexableLoadFunc{ + private static final Log LOG = LogFactory.getLog(DefaultIndexableLoader.class); // FileSpec of index file which will be read from HDFS. private String indexFile; @@ -160,25 +164,24 @@ public class DefaultIndexableLoader exte prevIdxEntry = curIdxEntry; } - if(matchedEntry == null){ + if (matchedEntry == null) { + LOG.warn("Empty index file: input directory is empty"); + } else { + + Object extractedKey = extractKeysFromIdxTuple(matchedEntry); - int errCode = 2165; - String errMsg = "Problem in index construction."; - throw new ExecException(errMsg,errCode,PigException.BUG); + if (extractedKey != null) { + Class idxKeyClass = extractedKey.getClass(); + if( ! firstLeftKey.getClass().equals(idxKeyClass)){ + + // This check should indeed be done on compile time. But to be on safe side, we do it on runtime also. + int errCode = 2166; + String errMsg = "Key type mismatch. Found key of type "+firstLeftKey.getClass().getCanonicalName()+" on left side. But, found key of type "+ idxKeyClass.getCanonicalName()+" in index built for right side."; + throw new ExecException(errMsg,errCode,PigException.BUG); + } + } } - Object extractedKey = extractKeysFromIdxTuple(matchedEntry); - - if(extractedKey != null){ - Class idxKeyClass = extractedKey.getClass(); - if( ! firstLeftKey.getClass().equals(idxKeyClass)){ - - // This check should indeed be done on compile time. But to be on safe side, we do it on runtime also. - int errCode = 2166; - String errMsg = "Key type mismatch. Found key of type "+firstLeftKey.getClass().getCanonicalName()+" on left side. But, found key of type "+ idxKeyClass.getCanonicalName()+" in index built for right side."; - throw new ExecException(errMsg,errCode,PigException.BUG); - } - } //add remaining split indexes to splitsAhead array int [] splitsAhead = new int[index.size()]; int splitsAheadIdx = 0;