Hi Paolo, Looks good. I went ahead and changed MAX_SPILL_FILES to protected and wrote a few unit tests. I'll close the JIRA as well.
-Stephen On Fri, Mar 9, 2012 at 6:47 AM, Paolo Castagna <[email protected]> wrote: > Stephen, please have a look at this, give it a spin and let me know if we can > close JENA-157. > > I'd like to find a better way to test this, perhaps MAX_SPILL_FILES should be > protected, so we can change it and add a proper unit test. > > What do you think? > > Paolo > > [email protected] wrote: >> Author: castagna >> Date: Fri Mar 9 14:38:53 2012 >> New Revision: 1298851 >> >> URL: http://svn.apache.org/viewvc?rev=1298851&view=rev >> Log: >> JENA-157 - This adds a pre-merge phase to SortedDataBag to merge files 100 >> at the time max. If more than 100 files need to be merged, it is done in >> multiple rounds. >> >> Modified: >> >> incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java >> >> Modified: >> incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java >> URL: >> http://svn.apache.org/viewvc/incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java?rev=1298851&r1=1298850&r2=1298851&view=diff >> ============================================================================== >> --- >> incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java >> (original) >> +++ >> incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java >> Fri Mar 9 14:38:53 2012 >> @@ -62,6 +62,8 @@ import org.openjena.atlas.lib.Sink ; >> */ >> public class SortedDataBag<E> extends AbstractDataBag<E> >> { >> + private static final int MAX_SPILL_FILES = 100 ; // this is the maximum >> number of files to merge at the same time >> + >> protected final ThresholdPolicy<E> policy; >> protected final SerializationFactory<E> serializationFactory; >> protected final Comparator<? super E> comparator; >> @@ -177,8 +179,15 @@ public class SortedDataBag<E> extends Ab >> * @return an Iterator >> */ >> @Override >> + public Iterator<E> iterator() >> + { >> + preMerge(); >> + >> + return iterator(getSpillFiles().size()); >> + } >> + >> @SuppressWarnings({ "unchecked", "rawtypes" }) >> - public Iterator<E> iterator() >> + private Iterator<E> iterator(int size) >> { >> checkClosed(); >> >> @@ -197,15 +206,16 @@ public class SortedDataBag<E> extends Ab >> >> if (spilled) >> { >> - List<Iterator<E>> inputs = new ArrayList<Iterator<E>>(); >> + List<Iterator<E>> inputs = new ArrayList<Iterator<E>>(size + >> (memSize > 0 ? 1 : 0)); >> >> if (memSize > 0) >> { >> inputs.add(memory.iterator()); >> } >> >> - for (File spillFile : getSpillFiles()) >> + for ( int i = 0; i < size; i++ ) >> { >> + File spillFile = getSpillFiles().get(i); >> try >> { >> Iterator<E> irc = getInputIterator(spillFile); >> @@ -241,6 +251,38 @@ public class SortedDataBag<E> extends Ab >> } >> } >> >> + private void preMerge() { >> + if (getSpillFiles() == null || getSpillFiles().size() <= >> MAX_SPILL_FILES) { return; } >> + >> + try { >> + while ( getSpillFiles().size() > MAX_SPILL_FILES ) { >> + Sink<E> sink = >> serializationFactory.createSerializer(getSpillStream()) ; >> + Iterator<E> ssi = iterator(MAX_SPILL_FILES) ; >> + try { >> + while ( ssi.hasNext() ) { >> + sink.send( ssi.next() ); >> + } >> + } finally { >> + Iter.close(ssi) ; >> + sink.close() ; >> + } >> + >> + List<File> toRemove = new ArrayList<File>(MAX_SPILL_FILES) ; >> + for ( int i = 0; i < MAX_SPILL_FILES; i++ ) { >> + File file = getSpillFiles().get(i) ; >> + file.delete() ; >> + toRemove.add(file) ; >> + } >> + >> + getSpillFiles().removeAll(toRemove) ; >> + >> + memory = new ArrayList<E>() ; >> + } >> + } catch (IOException e) { >> + throw new AtlasException(e) ; >> + } >> + } >> + >> @Override >> public void close() >> { >> >> >
