Hi Paolo,

Looks good.  I went ahead and changed MAX_SPILL_FILES to protected and
wrote a few unit tests.  I'll close the JIRA as well.

-Stephen

On Fri, Mar 9, 2012 at 6:47 AM, Paolo Castagna
<[email protected]> wrote:
> Stephen, please have a look at this, give it a spin and let me know if we can
> close JENA-157.
>
> I'd like to find a better way to test this, perhaps MAX_SPILL_FILES should be
> protected, so we can change it and add a proper unit test.
>
> What do you think?
>
> Paolo
>
> [email protected] wrote:
>> Author: castagna
>> Date: Fri Mar  9 14:38:53 2012
>> New Revision: 1298851
>>
>> URL: http://svn.apache.org/viewvc?rev=1298851&view=rev
>> Log:
>> JENA-157 - This adds a pre-merge phase to SortedDataBag to merge files 100 
>> at the time max. If more than 100 files need to be merged, it is done in 
>> multiple rounds.
>>
>> Modified:
>>     
>> incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java
>>
>> Modified: 
>> incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java
>> URL: 
>> http://svn.apache.org/viewvc/incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java?rev=1298851&r1=1298850&r2=1298851&view=diff
>> ==============================================================================
>> --- 
>> incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java
>>  (original)
>> +++ 
>> incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java
>>  Fri Mar  9 14:38:53 2012
>> @@ -62,6 +62,8 @@ import org.openjena.atlas.lib.Sink ;
>>   */
>>  public class SortedDataBag<E> extends AbstractDataBag<E>
>>  {
>> +    private static final int MAX_SPILL_FILES = 100 ; // this is the maximum 
>> number of files to merge at the same time
>> +
>>      protected final ThresholdPolicy<E> policy;
>>      protected final SerializationFactory<E> serializationFactory;
>>      protected final Comparator<? super E> comparator;
>> @@ -177,8 +179,15 @@ public class SortedDataBag<E> extends Ab
>>       * @return an Iterator
>>       */
>>      @Override
>> +     public Iterator<E> iterator()
>> +    {
>> +        preMerge();
>> +
>> +        return iterator(getSpillFiles().size());
>> +    }
>> +
>>      @SuppressWarnings({ "unchecked", "rawtypes" })
>> -    public Iterator<E> iterator()
>> +    private Iterator<E> iterator(int size)
>>      {
>>          checkClosed();
>>
>> @@ -197,15 +206,16 @@ public class SortedDataBag<E> extends Ab
>>
>>          if (spilled)
>>          {
>> -            List<Iterator<E>> inputs = new ArrayList<Iterator<E>>();
>> +            List<Iterator<E>> inputs = new ArrayList<Iterator<E>>(size + 
>> (memSize > 0 ? 1 : 0));
>>
>>              if (memSize > 0)
>>              {
>>                  inputs.add(memory.iterator());
>>              }
>>
>> -            for (File spillFile : getSpillFiles())
>> +            for ( int i = 0; i < size; i++ )
>>              {
>> +                File spillFile = getSpillFiles().get(i);
>>                  try
>>                  {
>>                      Iterator<E> irc = getInputIterator(spillFile);
>> @@ -241,6 +251,38 @@ public class SortedDataBag<E> extends Ab
>>          }
>>      }
>>
>> +    private void preMerge() {
>> +        if (getSpillFiles() == null || getSpillFiles().size() <= 
>> MAX_SPILL_FILES) { return; }
>> +
>> +        try {
>> +            while ( getSpillFiles().size() > MAX_SPILL_FILES ) {
>> +                Sink<E> sink = 
>> serializationFactory.createSerializer(getSpillStream()) ;
>> +                Iterator<E> ssi = iterator(MAX_SPILL_FILES) ;
>> +                try {
>> +                    while ( ssi.hasNext() ) {
>> +                        sink.send( ssi.next() );
>> +                    }
>> +                } finally {
>> +                    Iter.close(ssi) ;
>> +                    sink.close() ;
>> +                }
>> +
>> +                List<File> toRemove = new ArrayList<File>(MAX_SPILL_FILES) ;
>> +                for ( int i = 0; i < MAX_SPILL_FILES; i++ ) {
>> +                    File file = getSpillFiles().get(i) ;
>> +                    file.delete() ;
>> +                    toRemove.add(file) ;
>> +                }
>> +
>> +                getSpillFiles().removeAll(toRemove) ;
>> +
>> +                memory = new ArrayList<E>() ;
>> +            }
>> +        } catch (IOException e) {
>> +            throw new AtlasException(e) ;
>> +        }
>> +    }
>> +
>>      @Override
>>      public void close()
>>      {
>>
>>
>

Reply via email to