Stephen, please have a look at this, give it a spin and let me know if we can
close JENA-157.

I'd like to find a better way to test this, perhaps MAX_SPILL_FILES should be
protected, so we can change it and add a proper unit test.

What do you think?

Paolo

[email protected] wrote:
> Author: castagna
> Date: Fri Mar  9 14:38:53 2012
> New Revision: 1298851
> 
> URL: http://svn.apache.org/viewvc?rev=1298851&view=rev
> Log:
> JENA-157 - This adds a pre-merge phase to SortedDataBag to merge files 100 at 
> the time max. If more than 100 files need to be merged, it is done in 
> multiple rounds.
> 
> Modified:
>     
> incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java
> 
> Modified: 
> incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java
> URL: 
> http://svn.apache.org/viewvc/incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java?rev=1298851&r1=1298850&r2=1298851&view=diff
> ==============================================================================
> --- 
> incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java
>  (original)
> +++ 
> incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java
>  Fri Mar  9 14:38:53 2012
> @@ -62,6 +62,8 @@ import org.openjena.atlas.lib.Sink ;
>   */
>  public class SortedDataBag<E> extends AbstractDataBag<E>
>  {
> +    private static final int MAX_SPILL_FILES = 100 ; // this is the maximum 
> number of files to merge at the same time
> +
>      protected final ThresholdPolicy<E> policy;
>      protected final SerializationFactory<E> serializationFactory;
>      protected final Comparator<? super E> comparator;
> @@ -177,8 +179,15 @@ public class SortedDataBag<E> extends Ab
>       * @return an Iterator
>       */
>      @Override
> +     public Iterator<E> iterator()
> +    {
> +        preMerge();
> +
> +        return iterator(getSpillFiles().size());
> +    }
> +
>      @SuppressWarnings({ "unchecked", "rawtypes" })
> -    public Iterator<E> iterator()
> +    private Iterator<E> iterator(int size)
>      {
>          checkClosed();
>          
> @@ -197,15 +206,16 @@ public class SortedDataBag<E> extends Ab
>          
>          if (spilled)
>          {
> -            List<Iterator<E>> inputs = new ArrayList<Iterator<E>>();
> +            List<Iterator<E>> inputs = new ArrayList<Iterator<E>>(size + 
> (memSize > 0 ? 1 : 0));
>                          
>              if (memSize > 0)
>              {
>                  inputs.add(memory.iterator());
>              }
>              
> -            for (File spillFile : getSpillFiles())
> +            for ( int i = 0; i < size; i++ )
>              {
> +                File spillFile = getSpillFiles().get(i);
>                  try
>                  {
>                      Iterator<E> irc = getInputIterator(spillFile);
> @@ -241,6 +251,38 @@ public class SortedDataBag<E> extends Ab
>          }
>      }
>      
> +    private void preMerge() {
> +        if (getSpillFiles() == null || getSpillFiles().size() <= 
> MAX_SPILL_FILES) { return; }
> +
> +        try {
> +            while ( getSpillFiles().size() > MAX_SPILL_FILES ) {
> +                Sink<E> sink = 
> serializationFactory.createSerializer(getSpillStream()) ;
> +                Iterator<E> ssi = iterator(MAX_SPILL_FILES) ;
> +                try {
> +                    while ( ssi.hasNext() ) {
> +                        sink.send( ssi.next() );
> +                    }
> +                } finally {
> +                    Iter.close(ssi) ;
> +                    sink.close() ;
> +                }
> +                
> +                List<File> toRemove = new ArrayList<File>(MAX_SPILL_FILES) ;
> +                for ( int i = 0; i < MAX_SPILL_FILES; i++ ) {
> +                    File file = getSpillFiles().get(i) ;
> +                    file.delete() ;
> +                    toRemove.add(file) ;
> +                }
> +
> +                getSpillFiles().removeAll(toRemove) ;
> +
> +                memory = new ArrayList<E>() ;
> +            }            
> +        } catch (IOException e) {
> +            throw new AtlasException(e) ;
> +        }
> +    }
> +
>      @Override
>      public void close()
>      {
> 
> 

Reply via email to