Stephen, please have a look at this, give it a spin and let me know if we can close JENA-157.
I'd like to find a better way to test this, perhaps MAX_SPILL_FILES should be protected, so we can change it and add a proper unit test. What do you think? Paolo [email protected] wrote: > Author: castagna > Date: Fri Mar 9 14:38:53 2012 > New Revision: 1298851 > > URL: http://svn.apache.org/viewvc?rev=1298851&view=rev > Log: > JENA-157 - This adds a pre-merge phase to SortedDataBag to merge files 100 at > the time max. If more than 100 files need to be merged, it is done in > multiple rounds. > > Modified: > > incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java > > Modified: > incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java > URL: > http://svn.apache.org/viewvc/incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java?rev=1298851&r1=1298850&r2=1298851&view=diff > ============================================================================== > --- > incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java > (original) > +++ > incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java > Fri Mar 9 14:38:53 2012 > @@ -62,6 +62,8 @@ import org.openjena.atlas.lib.Sink ; > */ > public class SortedDataBag<E> extends AbstractDataBag<E> > { > + private static final int MAX_SPILL_FILES = 100 ; // this is the maximum > number of files to merge at the same time > + > protected final ThresholdPolicy<E> policy; > protected final SerializationFactory<E> serializationFactory; > protected final Comparator<? super E> comparator; > @@ -177,8 +179,15 @@ public class SortedDataBag<E> extends Ab > * @return an Iterator > */ > @Override > + public Iterator<E> iterator() > + { > + preMerge(); > + > + return iterator(getSpillFiles().size()); > + } > + > @SuppressWarnings({ "unchecked", "rawtypes" }) > - public Iterator<E> iterator() > + private Iterator<E> iterator(int size) > { > checkClosed(); > > @@ -197,15 +206,16 @@ public class SortedDataBag<E> extends Ab > > if (spilled) > { > - List<Iterator<E>> inputs = new ArrayList<Iterator<E>>(); > + List<Iterator<E>> inputs = new ArrayList<Iterator<E>>(size + > (memSize > 0 ? 1 : 0)); > > if (memSize > 0) > { > inputs.add(memory.iterator()); > } > > - for (File spillFile : getSpillFiles()) > + for ( int i = 0; i < size; i++ ) > { > + File spillFile = getSpillFiles().get(i); > try > { > Iterator<E> irc = getInputIterator(spillFile); > @@ -241,6 +251,38 @@ public class SortedDataBag<E> extends Ab > } > } > > + private void preMerge() { > + if (getSpillFiles() == null || getSpillFiles().size() <= > MAX_SPILL_FILES) { return; } > + > + try { > + while ( getSpillFiles().size() > MAX_SPILL_FILES ) { > + Sink<E> sink = > serializationFactory.createSerializer(getSpillStream()) ; > + Iterator<E> ssi = iterator(MAX_SPILL_FILES) ; > + try { > + while ( ssi.hasNext() ) { > + sink.send( ssi.next() ); > + } > + } finally { > + Iter.close(ssi) ; > + sink.close() ; > + } > + > + List<File> toRemove = new ArrayList<File>(MAX_SPILL_FILES) ; > + for ( int i = 0; i < MAX_SPILL_FILES; i++ ) { > + File file = getSpillFiles().get(i) ; > + file.delete() ; > + toRemove.add(file) ; > + } > + > + getSpillFiles().removeAll(toRemove) ; > + > + memory = new ArrayList<E>() ; > + } > + } catch (IOException e) { > + throw new AtlasException(e) ; > + } > + } > + > @Override > public void close() > { > >
