Author: olga Date: Wed Sep 17 18:15:03 2008 New Revision: 696528 URL: http://svn.apache.org/viewvc?rev=696528&view=rev Log: merge from trunk for PIG-342-151
Added: incubator/pig/branches/types/test/org/apache/pig/test/TestBZip.java Modified: incubator/pig/branches/types/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2OutputStream.java incubator/pig/branches/types/src/org/apache/pig/data/DistinctDataBag.java incubator/pig/branches/types/test/org/apache/pig/test/TestDataBag.java Modified: incubator/pig/branches/types/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2OutputStream.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2OutputStream.java?rev=696528&r1=696527&r2=696528&view=diff ============================================================================== --- incubator/pig/branches/types/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2OutputStream.java (original) +++ incubator/pig/branches/types/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2OutputStream.java Wed Sep 17 18:15:03 2008 @@ -424,6 +424,11 @@ super.finalize(); } + // The bytes to fillin an empty file + final private static byte emptyFileArray[] = { + 0x39, 0x17, 0x72, 0x45, 0x38, 0x50, (byte) 0x90, 00, 00, 00, 00 + }; + public void close() throws IOException { if (closed) { return; @@ -434,8 +439,10 @@ } currentChar = -1; if (written){ - endBlock(); - endCompression(); + endBlock(); + endCompression(); + } else { + bsStream.write(emptyFileArray); } closed = true; super.close(); Modified: incubator/pig/branches/types/src/org/apache/pig/data/DistinctDataBag.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DistinctDataBag.java?rev=696528&r1=696527&r2=696528&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/data/DistinctDataBag.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/data/DistinctDataBag.java Wed Sep 17 18:15:03 2008 @@ -72,6 +72,28 @@ return true; } + + public long size() { + if (mSpillFiles != null && mSpillFiles.size() > 0){ + //We need to racalculate size to guarantee a count of unique + //entries including those on disk + Iterator<Tuple> iter = iterator(); + int newSize = 0; + while (iter.hasNext()) { + newSize++; + iter.next(); + } + + synchronized(mContents) { + //we don't want adds to change our numbers + //the lock may need to cover more of the method + mSize = newSize; + } + } + return mSize; + } + + public Iterator<Tuple> iterator() { return new DistinctDataBagIterator(); } @@ -88,7 +110,6 @@ @Override public void addAll(DataBag b) { synchronized (mContents) { - mSize += b.size(); Iterator<Tuple> i = b.iterator(); while (i.hasNext()) { if (mContents.add(i.next())) { Added: incubator/pig/branches/types/test/org/apache/pig/test/TestBZip.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestBZip.java?rev=696528&view=auto ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/TestBZip.java (added) +++ incubator/pig/branches/types/test/org/apache/pig/test/TestBZip.java Wed Sep 17 18:15:03 2008 @@ -0,0 +1,122 @@ +package org.apache.pig.test; + +import static org.apache.pig.ExecType.MAPREDUCE; + +import java.io.File; +import java.io.FileOutputStream; +import java.util.HashMap; +import java.util.Iterator; + +import junit.framework.TestCase; + +import org.apache.pig.PigServer; +import org.apache.pig.backend.local.datastorage.LocalSeekableInputStream; +import org.apache.pig.data.DataByteArray; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; +import org.apache.tools.bzip2r.CBZip2InputStream; +import org.apache.tools.bzip2r.CBZip2OutputStream; +import org.junit.Test; + +public class TestBZip extends TestCase { + MiniCluster cluster = MiniCluster.buildCluster(); + + /** + * Tests the end-to-end writing and reading of a BZip file. + */ + @Test + public void testBzipInPig() throws Exception { + PigServer pig = new PigServer(MAPREDUCE); + try { + pig.deleteFile("junit-out.bz"); + } catch (Exception e) { + } + File in = File.createTempFile("junit", ".bz"); + in.deleteOnExit(); + File out = File.createTempFile("junit", ".bz"); + out.deleteOnExit(); + out.delete(); + CBZip2OutputStream cos = new CBZip2OutputStream( + new FileOutputStream(in)); + for (int i = 1; i < 100; i++) { + cos.write((i + "\n").getBytes()); + cos.write((-i + "\n").getBytes()); + } + cos.close(); + pig.registerQuery("AA=load 'file:" + in.getAbsolutePath() + "';"); + pig.registerQuery("A=foreach (group (filter AA by $0 > 0) all) generate flatten($1);"); + pig.store("A", "file:" + out.getAbsolutePath()); + CBZip2InputStream cis = new CBZip2InputStream( + new LocalSeekableInputStream(new File(out, "part-00000.bz"))); + // Just a sanity check, to make sure it was a bzip file; we + // will do the value verification later + assertEquals(100, cis.read(new byte[100])); + cis.close(); + pig.registerQuery("B=load 'file:" + out.getAbsolutePath() + "';"); + Iterator<Tuple> i = pig.openIterator("B"); + HashMap<Integer, Integer> map = new HashMap<Integer, Integer>(); + while (i.hasNext()) { + Integer val = DataType.toInteger(i.next().get(0)); + map.put(val, val); + + } + assertEquals(new Integer(99), new Integer(map.keySet().size())); + for(int j = 1; j < 100; j++) { + assertEquals(new Integer(j), map.get(j)); + } + in.delete(); + out.delete(); + } + + /** + * Tests the end-to-end writing and reading of an empty BZip file. + */ + @Test + public void testEmptyBzipInPig() throws Exception { + PigServer pig = new PigServer(MAPREDUCE); + try { + pig.deleteFile("junit-out.bz"); + } catch (Exception e) { + } + File in = File.createTempFile("junit", ".tmp"); + in.deleteOnExit(); + File out = File.createTempFile("junit", ".bz"); + out.deleteOnExit(); + out.delete(); + FileOutputStream fos = new FileOutputStream(in); + fos.write("55\n".getBytes()); + fos.close(); + System.out.println(in.getAbsolutePath()); + pig.registerQuery("AA=load 'file:" + in.getAbsolutePath() + "';"); + pig + .registerQuery("A=foreach (group (filter AA by $0 < '0') all) generate flatten($1);"); + pig.store("A", "file:" + out.getAbsolutePath()); + CBZip2InputStream cis = new CBZip2InputStream( + new LocalSeekableInputStream(new File(out, "part-00000.bz"))); + assertEquals(-1, cis.read(new byte[100])); + cis.close(); + pig.registerQuery("B=load 'file:" + out.getAbsolutePath() + "';"); + pig.openIterator("B"); + in.delete(); + out.delete(); + } + + /** + * Tests the writing and reading of an empty BZip file. + */ + @Test + public void testEmptyBzip() throws Exception { + File tmp = File.createTempFile("junit", ".tmp"); + tmp.deleteOnExit(); + CBZip2OutputStream cos = new CBZip2OutputStream(new FileOutputStream( + tmp)); + cos.close(); + assertNotSame(0, tmp.length()); + CBZip2InputStream cis = new CBZip2InputStream( + new LocalSeekableInputStream(tmp)); + assertEquals(-1, cis.read(new byte[100])); + cis.close(); + tmp.delete(); + + } +} Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestDataBag.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestDataBag.java?rev=696528&r1=696527&r2=696528&view=diff ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/TestDataBag.java (original) +++ incubator/pig/branches/types/test/org/apache/pig/test/TestDataBag.java Wed Sep 17 18:15:03 2008 @@ -544,6 +544,8 @@ } mgr.forceSpill(); } + + assertEquals("Size of distinct data bag is incorrect", b.size(), rightAnswer.size()); // Read tuples back, hopefully they come out in the same order. Iterator<Tuple> bIter = b.iterator();