Author: thejas Date: Tue Aug 3 17:43:33 2010 New Revision: 981984 URL: http://svn.apache.org/viewvc?rev=981984&view=rev Log: PIG-1516: finalize in bag implementations causes pig to run out of memory in reduce (thejas)
Added: hadoop/pig/trunk/src/org/apache/pig/data/FileList.java Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java hadoop/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java hadoop/pig/trunk/src/org/apache/pig/data/SortedDataBag.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=981984&r1=981983&r2=981984&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Tue Aug 3 17:43:33 2010 @@ -110,6 +110,8 @@ PIG-1309: Map-side Cogroup (ashutoshc) BUG FIXES +PIG-1516: finalize in bag implementations causes pig to run out of memory in reduce (thejas) + PIG-1521: explain plan does not show correct Physical operator in MR plan when POSortedDistinct, POPackageLite are used (thejas) PIG-1513: Pig doesn't handle empty input directory (rding) Modified: hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java?rev=981984&r1=981983&r2=981984&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java Tue Aug 3 17:43:33 2010 @@ -57,7 +57,7 @@ public abstract class DefaultAbstractBag protected Collection<Tuple> mContents; // Spill files we've created. These need to be removed in finalize. - protected ArrayList<File> mSpillFiles; + protected FileList mSpillFiles; // Total size, including tuples on disk. Stored here so we don't have // to run through the disk when people ask. @@ -317,21 +317,6 @@ public abstract class DefaultAbstractBag } /** - * Need to override finalize to clean out the mSpillFiles array. - */ - @Override - protected void finalize() { - if (mSpillFiles != null) { - for (int i = 0; i < mSpillFiles.size(); i++) { - boolean res = mSpillFiles.get(i).delete(); - if (!res) - warn ("DefaultAbstractBag.finalize: failed to delete " + mSpillFiles.get(i), PigWarning.DELETE_FAILED, null); - - } - } - } - - /** * Get a file to spill contents to. The file will be registered in the * mSpillFiles array. * @return stream to write tuples to. @@ -339,7 +324,7 @@ public abstract class DefaultAbstractBag protected DataOutputStream getSpillFile() throws IOException { if (mSpillFiles == null) { // We want to keep the list as small as possible. - mSpillFiles = new ArrayList<File>(1); + mSpillFiles = new FileList(1); } String tmpDirName= System.getProperties().getProperty("java.io.tmpdir") ; Modified: hadoop/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java?rev=981984&r1=981983&r2=981984&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java Tue Aug 3 17:43:33 2010 @@ -38,7 +38,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.pig.PigCounters; import org.apache.pig.PigWarning; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger; @@ -492,6 +491,7 @@ public class DistinctDataBag extends Def // it efficiently. try { LinkedList<File> ll = new LinkedList<File>(mSpillFiles); + LinkedList<File> filesToDelete = new LinkedList<File>(); while (ll.size() > MAX_SPILL_FILES) { ListIterator<File> i = ll.listIterator(); mStreams = @@ -500,12 +500,15 @@ public class DistinctDataBag extends Def for (int j = 0; j < MAX_SPILL_FILES; j++) { try { + File f = i.next(); DataInputStream in = new DataInputStream(new BufferedInputStream( - new FileInputStream(i.next()))); + new FileInputStream(f))); mStreams.add(in); addToQueue(null, mStreams.size() - 1); i.remove(); + filesToDelete.add(f); + } catch (FileNotFoundException fnfe) { // We can't find our own spill file? That should // neer happen. @@ -534,9 +537,19 @@ public class DistinctDataBag extends Def throw new RuntimeException(msg, ioe); } } + // delete files that have been merged into new files + for(File f : filesToDelete){ + if( f.delete() == false){ + log.warn("Failed to delete spill file: " + f.getPath()); + } + } + + // clear the list, so that finalize does not delete any files, + // when mSpillFiles is assigned a new value + mSpillFiles.clear(); // Now, move our new list back to the spill files array. - mSpillFiles = new ArrayList<File>(ll); + mSpillFiles = new FileList(ll); } finally { // Reset mStreams and mMerge so that they'll be allocated // properly for regular merging. Added: hadoop/pig/trunk/src/org/apache/pig/data/FileList.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/FileList.java?rev=981984&view=auto ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/data/FileList.java (added) +++ hadoop/pig/trunk/src/org/apache/pig/data/FileList.java Tue Aug 3 17:43:33 2010 @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.data; + +import java.io.File; +import java.util.ArrayList; +import java.util.LinkedList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * This class extends ArrayList<File> to add a finalize() that + * calls delete on the files . + * This helps in getting rid of the finalize() in the classes such + * as DefaultAbstractBag, and they can be freed up without waiting + * for finalize to be called. Only if those classes have spilled to + * disk, there will be a (this) class that needs to be finalized. + * + * CAUTION: if you assign a new value for a variable of this type, + * the files (if any) in the old object it pointed to will be scheduled for + * deletion. To avoid that call .clear() before assigning a new value. + */ +public class FileList extends ArrayList<File> { + + private static final long serialVersionUID = 1L; + private static final Log log = LogFactory.getLog(FileList.class); + + public FileList(int i) { + super(i); + } + + public FileList(){ + } + + public FileList(LinkedList<File> ll) { + super(ll); + } + + @Override + protected void finalize(){ + for(File f : this){ + if(f.delete() == false){ + log.warn("Failed to delete file: " + f.getPath()); + } + } + } + +} Modified: hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java?rev=981984&r1=981983&r2=981984&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java Tue Aug 3 17:43:33 2010 @@ -23,20 +23,20 @@ import java.util.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.pig.PigCounters; +import org.apache.pig.PigWarning; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; public class InternalCachedBag extends DefaultAbstractBag { - private static final long serialVersionUID = 1L; - - private static final Log log = LogFactory.getLog(InternalCachedBag.class); + private static final long serialVersionUID = 1L; + + private static final Log log = LogFactory.getLog(InternalCachedBag.class); private transient int cacheLimit; private transient long maxMemUsage; private transient long memUsage; private transient DataOutputStream out; private transient boolean addDone; private transient TupleFactory factory; - public InternalCachedBag() { this(1); @@ -145,20 +145,12 @@ public class InternalCachedBag extends D public void clear() { if (!addDone) { - addDone(); + addDone(); } super.clear(); addDone = false; out = null; } - - protected void finalize() { - if (!addDone) { - // close the spill file so it can be deleted - addDone(); - } - super.finalize(); - } public boolean isDistinct() { return false; @@ -173,8 +165,8 @@ public class InternalCachedBag extends D // close the spill file and mark adding is done // so further adding is disallowed. addDone(); - } - return new CachedBagIterator(); + } + return new CachedBagIterator(); } public long spill() @@ -202,11 +194,12 @@ public class InternalCachedBag extends D } + public boolean hasNext() { - if (next != null) { - return true; - } - + if (next != null) { + return true; + } + if(iter.hasNext()){ next = iter.next(); return true; @@ -236,32 +229,21 @@ public class InternalCachedBag extends D } public Tuple next() { - if (next == null) { - if (!hasNext()) { - throw new IllegalStateException("No more elements from iterator"); - } - } - Tuple t = next; - next = null; - - return t; + if (next == null) { + if (!hasNext()) { + throw new NoSuchElementException("No more elements from iterator"); + } + } + Tuple t = next; + next = null; + + return t; } public void remove() { throw new UnsupportedOperationException("remove is not supported for CachedBagIterator"); } - protected void finalize() { - if(in != null) { - try - { - in.close(); - } - catch(Exception e) { - - } - } - } } } Modified: hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java?rev=981984&r1=981983&r2=981984&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java Tue Aug 3 17:43:33 2010 @@ -464,7 +464,9 @@ public class InternalDistinctBag extends // we'll be removing pieces from the middle and we want to do // it efficiently. try { + LinkedList<File> ll = new LinkedList<File>(mSpillFiles); + LinkedList<File> filesToDelete = new LinkedList<File>(); while (ll.size() > MAX_SPILL_FILES) { ListIterator<File> i = ll.listIterator(); mStreams = @@ -473,12 +475,14 @@ public class InternalDistinctBag extends for (int j = 0; j < MAX_SPILL_FILES; j++) { try { + File f = i.next(); DataInputStream in = new DataInputStream(new BufferedInputStream( - new FileInputStream(i.next()))); + new FileInputStream(f))); mStreams.add(in); addToQueue(null, mStreams.size() - 1); i.remove(); + filesToDelete.add(f); } catch (FileNotFoundException fnfe) { // We can't find our own spill file? That should // neer happen. @@ -507,9 +511,20 @@ public class InternalDistinctBag extends throw new RuntimeException(msg, ioe); } } - + + // delete files that have been merged into new files + for(File f : filesToDelete){ + if( f.delete() == false){ + log.warn("Failed to delete spill file: " + f.getPath()); + } + } + + // clear the list, so that finalize does not delete any files, + // when mSpillFiles is assigned a new value + mSpillFiles.clear(); + // Now, move our new list back to the spill files array. - mSpillFiles = new ArrayList<File>(ll); + mSpillFiles = new FileList(ll); } finally { // Reset mStreams and mMerge so that they'll be allocated // properly for regular merging. Modified: hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java?rev=981984&r1=981983&r2=981984&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java Tue Aug 3 17:43:33 2010 @@ -456,6 +456,7 @@ public class InternalSortedBag extends D // it efficiently. try { LinkedList<File> ll = new LinkedList<File>(mSpillFiles); + LinkedList<File> filesToDelete = new LinkedList<File>(); while (ll.size() > MAX_SPILL_FILES) { ListIterator<File> i = ll.listIterator(); mStreams = @@ -464,12 +465,15 @@ public class InternalSortedBag extends D for (int j = 0; j < MAX_SPILL_FILES; j++) { try { + File f = i.next(); DataInputStream in = new DataInputStream(new BufferedInputStream( - new FileInputStream(i.next()))); + new FileInputStream(f))); mStreams.add(in); addToQueue(null, mStreams.size() - 1); i.remove(); + filesToDelete.add(f); + } catch (FileNotFoundException fnfe) { // We can't find our own spill file? That should // neer happen. @@ -498,9 +502,19 @@ public class InternalSortedBag extends D throw new RuntimeException(msg, ioe); } } + // delete files that have been merged into new files + for(File f : filesToDelete){ + if( f.delete() == false){ + log.warn("Failed to delete spill file: " + f.getPath()); + } + } + + // clear the list, so that finalize does not delete any files, + // when mSpillFiles is assigned a new value + mSpillFiles.clear(); // Now, move our new list back to the spill files array. - mSpillFiles = new ArrayList<File>(ll); + mSpillFiles = new FileList(ll); } finally { // Reset mStreams and mMerge so that they'll be allocated // properly for regular merging. Modified: hadoop/pig/trunk/src/org/apache/pig/data/SortedDataBag.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/SortedDataBag.java?rev=981984&r1=981983&r2=981984&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/data/SortedDataBag.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/data/SortedDataBag.java Tue Aug 3 17:43:33 2010 @@ -450,6 +450,7 @@ public class SortedDataBag extends Defau // it efficiently. try { LinkedList<File> ll = new LinkedList<File>(mSpillFiles); + LinkedList<File> filesToDelete = new LinkedList<File>(); while (ll.size() > MAX_SPILL_FILES) { ListIterator<File> i = ll.listIterator(); mStreams = @@ -458,12 +459,14 @@ public class SortedDataBag extends Defau for (int j = 0; j < MAX_SPILL_FILES; j++) { try { + File f = i.next(); DataInputStream in = new DataInputStream(new BufferedInputStream( - new FileInputStream(i.next()))); + new FileInputStream(f))); mStreams.add(in); addToQueue(null, mStreams.size() - 1); i.remove(); + filesToDelete.add(f); } catch (FileNotFoundException fnfe) { // We can't find our own spill file? That should // neer happen. @@ -492,9 +495,20 @@ public class SortedDataBag extends Defau throw new RuntimeException(msg, ioe); } } + // delete files that have been merged into new files + for(File f : filesToDelete){ + if( f.delete() == false){ + log.warn("Failed to delete spill file: " + f.getPath()); + } + } + + // clear the list, so that finalize does not delete any files, + // when mSpillFiles is assigned a new value + mSpillFiles.clear(); // Now, move our new list back to the spill files array. - mSpillFiles = new ArrayList<File>(ll); + mSpillFiles = new FileList(ll); + } finally { // Reset mStreams and mMerge so that they'll be allocated // properly for regular merging.