On Thu, Oct 30, 2008 at 10:20 AM,  <nextgens at freenetproject.org> wrote:
> Author: nextgens
> Date: 2008-10-30 02:20:49 +0000 (Thu, 30 Oct 2008)
> New Revision: 23211
>
> Added:
>   trunk/freenet/src/freenet/support/compress/CompressJob.java
>   trunk/freenet/src/freenet/support/compress/RealCompressor.java
> Modified:
>   trunk/freenet/src/freenet/client/InsertContext.java
>   trunk/freenet/src/freenet/client/async/SingleFileInserter.java
> Log:
> More untested code on the serialized compressor. It shouldn't run out of mem 
> anymore

This change give these error: http://code.bulix.org/vec613-68848?raw


> Modified: trunk/freenet/src/freenet/client/InsertContext.java
> ===================================================================
> --- trunk/freenet/src/freenet/client/InsertContext.java 2008-10-29 23:22:36 
> UTC (rev 23210)
> +++ trunk/freenet/src/freenet/client/InsertContext.java 2008-10-30 02:20:49 
> UTC (rev 23211)
> @@ -10,6 +10,7 @@
>  import freenet.crypt.RandomSource;
>  import freenet.support.Executor;
>  import freenet.support.api.BucketFactory;
> +import freenet.support.compress.RealCompressor;
>  import freenet.support.io.NullPersistentFileTracker;
>  import freenet.support.io.PersistentFileTracker;
>
> @@ -34,6 +35,7 @@
>        public final USKManager uskManager;
>        public final BackgroundBlockEncoder backgroundBlockEncoder;
>        public final Executor executor;
> +       public final RealCompressor compressor;
>
>        public InsertContext(BucketFactory bf, BucketFactory persistentBF, 
> PersistentFileTracker tracker, RandomSource random,
>                        int maxRetries, int rnfsToSuccess, int maxThreads, int 
> splitfileSegmentDataBlocks, int splitfileSegmentCheckBlocks,
> @@ -54,6 +56,8 @@
>                this.cacheLocalRequests = cacheLocalRequests;
>                this.backgroundBlockEncoder = blockEncoder;
>                this.executor = executor;
> +               this.compressor = new RealCompressor(executor);
> +               executor.execute(compressor, "Compression scheduler");
>        }
>
>        public InsertContext(InsertContext ctx, SimpleEventProducer producer, 
> boolean forceNonPersistent) {
> @@ -73,6 +77,7 @@
>                this.cacheLocalRequests = ctx.cacheLocalRequests;
>                this.backgroundBlockEncoder = ctx.backgroundBlockEncoder;
>                this.executor = ctx.executor;
> +               this.compressor = ctx.compressor;
>        }
>
>        public InsertContext(InsertContext ctx, SimpleEventProducer producer) {
> @@ -92,6 +97,7 @@
>                this.cacheLocalRequests = ctx.cacheLocalRequests;
>                this.backgroundBlockEncoder = ctx.backgroundBlockEncoder;
>                this.executor = ctx.executor;
> +               this.compressor = ctx.compressor;
>        }
>
>  }
>
> Modified: trunk/freenet/src/freenet/client/async/SingleFileInserter.java
> ===================================================================
> --- trunk/freenet/src/freenet/client/async/SingleFileInserter.java      
> 2008-10-29 23:22:36 UTC (rev 23210)
> +++ trunk/freenet/src/freenet/client/async/SingleFileInserter.java      
> 2008-10-30 02:20:49 UTC (rev 23211)
> @@ -18,15 +18,14 @@
>  import freenet.keys.SSKBlock;
>  import freenet.node.PrioRunnable;
>  import freenet.support.Logger;
> -import freenet.support.OOMHandler;
>  import freenet.support.SimpleFieldSet;
>  import freenet.support.api.Bucket;
> +import freenet.support.compress.CompressJob;
>  import freenet.support.compress.CompressionOutputSizeException;
>  import freenet.support.compress.Compressor.COMPRESSOR_TYPE;
>  import freenet.support.io.BucketChainBucketFactory;
>  import freenet.support.io.BucketTools;
>  import freenet.support.io.NativeThread;
> -import java.util.concurrent.Semaphore;
>
>  /**
>  * Attempt to insert a file. May include metadata.
> @@ -35,7 +34,7 @@
>  * Attempt to compress the file. Off-thread if it will take a while.
>  * Then hand it off to SimpleFileInserter.
>  */
> -class SingleFileInserter implements ClientPutState {
> +public class SingleFileInserter implements ClientPutState, CompressJob {
>
>        private static boolean logMINOR;
>        final BaseClientPutter parent;
> @@ -106,45 +105,10 @@
>                        }
>                }
>                // Run off thread in any case
> -               OffThreadCompressor otc = new OffThreadCompressor();
> -               ctx.executor.execute(otc, "Compressor for " + this);
> +               ctx.compressor.enqueueNewJob(this);
>        }
>
> -       private  class OffThreadCompressor implements PrioRunnable {
> -               public void run() {
> -                   freenet.support.Logger.OSThread.logPID(this);
> -                       try {
> -                               tryCompress();
> -                       } catch (InsertException e) {
> -                               cb.onFailure(e, SingleFileInserter.this);
> -            } catch (OutOfMemoryError e) {
> -                               OOMHandler.handleOOM(e);
> -                               System.err.println("OffThreadCompressor 
> thread above failed.");
> -                               // Might not be heap, so try anyway
> -                               cb.onFailure(new 
> InsertException(InsertException.INTERNAL_ERROR, e, null), 
> SingleFileInserter.this);
> -            } catch (Throwable t) {
> -                Logger.error(this, "Caught in OffThreadCompressor: "+t, t);
> -                System.err.println("Caught in OffThreadCompressor: "+t);
> -                t.printStackTrace();
> -                // Try to fail gracefully
> -                               cb.onFailure(new 
> InsertException(InsertException.INTERNAL_ERROR, t, null), 
> SingleFileInserter.this);
> -                       }
> -               }
> -
> -               public int getPriority() {
> -                       return NativeThread.LOW_PRIORITY;
> -               }
> -       }
> -
> -       private void tryCompress() throws InsertException {
> -               try {
> -                       try {
> -                               COMPRESSOR_TYPE.compressorSemaphore.acquire();
> -                       } catch (InterruptedException e) {
> -                               // should not happen
> -                               Logger.error(this, "Caught an 
> InterruptedException:"+e.getMessage(), e);
> -                               throw new 
> InsertException(InsertException.INTERNAL_ERROR, e, null);
> -                       }
> +       public void tryCompress() throws InsertException {
>                // First, determine how small it needs to be
>                Bucket origData = block.getData();
>                Bucket data = origData;
> @@ -224,91 +188,111 @@
>                        if(tryCompress)
>                                ctx.eventProducer.produceEvent(new 
> FinishedCompressionEvent(bestCodec == null ? -1 : bestCodec.metadataID, 
> origSize, data.size()));
>                        if(logMINOR) Logger.minor(this, "Compressed 
> "+origSize+" to "+data.size()+" on "+this);
> -               }
> -
> -               // Compressed data
> -
> -               // Insert it...
> -               short codecNumber = bestCodec == null ? -1 : 
> bestCodec.metadataID;
> -               long compressedDataSize = data.size();
> -               boolean fitsInOneBlockAsIs = bestCodec == null ? 
> compressedDataSize < blockSize : compressedDataSize < oneBlockCompressedSize;
> -               boolean fitsInOneCHK = bestCodec == null ? compressedDataSize 
> < CHKBlock.DATA_LENGTH : compressedDataSize < 
> CHKBlock.MAX_COMPRESSED_DATA_LENGTH;
> +               }
> +               // Compressed data ; now insert it
> +               // We do it off thread so that RealCompressor can release the 
> semaphore
> +               final COMPRESSOR_TYPE fbestCodec = bestCodec;
> +               final Bucket fdata = data;
> +               final int foneBlockCompressedSize = oneBlockCompressedSize;
> +               final int fblockSize = blockSize;
> +               final long forigSize = origSize;
> +               final boolean fshouldFreeData = shouldFreeData;
> +               ctx.executor.execute(new PrioRunnable() {
>
> -               if((fitsInOneBlockAsIs || fitsInOneCHK) && 
> block.getData().size() > Integer.MAX_VALUE)
> -                       throw new 
> InsertException(InsertException.INTERNAL_ERROR, "2GB+ should not encode to 
> one block!", null);
> +                       public int getPriority() {
> +                               return NativeThread.NORM_PRIORITY;
> +                       }
>
> -               boolean noMetadata = ((block.clientMetadata == null) || 
> block.clientMetadata.isTrivial()) && targetFilename == null;
> -               if(noMetadata && archiveType == null) {
> -                       if(fitsInOneBlockAsIs) {
> -                               // Just insert it
> -                               ClientPutState bi =
> -                                       createInserter(parent, data, 
> codecNumber, block.desiredURI, ctx, cb, metadata, 
> (int)block.getData().size(), -1, getCHKOnly, true, true);
> -                               cb.onTransition(this, bi);
> -                               bi.schedule();
> -                               cb.onBlockSetFinished(this);
> +                       public void run() {
> +                               insert(fbestCodec, fdata, 
> foneBlockCompressedSize, fblockSize, forigSize, fshouldFreeData);
> +                       }
> +               }, "Insert thread for "+this);
> +       }
> +
> +       private void insert(COMPRESSOR_TYPE bestCodec, Bucket data, int 
> oneBlockCompressedSize, int blockSize, long origSize, boolean shouldFreeData) 
> {
> +               try {
> +                       // Insert it...
> +                       short codecNumber = bestCodec == null ? -1 : 
> bestCodec.metadataID;
> +                       long compressedDataSize = data.size();
> +                       boolean fitsInOneBlockAsIs = bestCodec == null ? 
> compressedDataSize < blockSize : compressedDataSize < oneBlockCompressedSize;
> +                       boolean fitsInOneCHK = bestCodec == null ? 
> compressedDataSize < CHKBlock.DATA_LENGTH : compressedDataSize < 
> CHKBlock.MAX_COMPRESSED_DATA_LENGTH;
> +
> +                       if((fitsInOneBlockAsIs || fitsInOneCHK) && 
> block.getData().size() > Integer.MAX_VALUE)
> +                               throw new 
> InsertException(InsertException.INTERNAL_ERROR, "2GB+ should not encode to 
> one block!", null);
> +
> +                       boolean noMetadata = ((block.clientMetadata == null) 
> || block.clientMetadata.isTrivial()) && targetFilename == null;
> +                       if(noMetadata && archiveType == null)
> +                               if(fitsInOneBlockAsIs) {
> +                                       // Just insert it
> +                                       ClientPutState bi =
> +                                               createInserter(parent, data, 
> codecNumber, block.desiredURI, ctx, cb, metadata, (int) 
> block.getData().size(), -1, getCHKOnly, true, true);
> +                                       cb.onTransition(this, bi);
> +                                       bi.schedule();
> +                                       cb.onBlockSetFinished(this);
> +                                       return;
> +                               }
> +                       if(fitsInOneCHK) {
> +                               // Insert single block, then insert pointer 
> to it
> +                               if(reportMetadataOnly) {
> +                                       SingleBlockInserter dataPutter = new 
> SingleBlockInserter(parent, data, codecNumber, FreenetURI.EMPTY_CHK_URI, ctx, 
> cb, metadata, (int) origSize, -1, getCHKOnly, true, true, token);
> +                                       Metadata meta = 
> makeMetadata(archiveType, bestCodec, dataPutter.getURI());
> +                                       cb.onMetadata(meta, this);
> +                                       cb.onTransition(this, dataPutter);
> +                                       dataPutter.schedule();
> +                                       cb.onBlockSetFinished(this);
> +                               } else {
> +                                       MultiPutCompletionCallback mcb =
> +                                               new 
> MultiPutCompletionCallback(cb, parent, token);
> +                                       SingleBlockInserter dataPutter = new 
> SingleBlockInserter(parent, data, codecNumber, FreenetURI.EMPTY_CHK_URI, ctx, 
> mcb, metadata, (int) origSize, -1, getCHKOnly, true, false, token);
> +                                       Metadata meta = 
> makeMetadata(archiveType, bestCodec, dataPutter.getURI());
> +                                       Bucket metadataBucket;
> +                                       try {
> +                                               metadataBucket = 
> BucketTools.makeImmutableBucket(ctx.bf, meta.writeToByteArray());
> +                                       } catch(IOException e) {
> +                                               Logger.error(this, "Caught " 
> + e, e);
> +                                               throw new 
> InsertException(InsertException.BUCKET_ERROR, e, null);
> +                                       } catch(MetadataUnresolvedException 
> e) {
> +                                               // Impossible, we're not 
> inserting a manifest.
> +                                               Logger.error(this, "Caught " 
> + e, e);
> +                                               throw new 
> InsertException(InsertException.INTERNAL_ERROR, "Got 
> MetadataUnresolvedException in SingleFileInserter: " + e.toString(), null);
> +                                       }
> +                                       ClientPutState metaPutter = 
> createInserter(parent, metadataBucket, (short) -1, block.desiredURI, ctx, 
> mcb, true, (int) origSize, -1, getCHKOnly, true, false);
> +                                       mcb.addURIGenerator(metaPutter);
> +                                       mcb.add(dataPutter);
> +                                       cb.onTransition(this, mcb);
> +                                       Logger.minor(this, "" + mcb + " : 
> data " + dataPutter + " meta " + metaPutter);
> +                                       mcb.arm();
> +                                       dataPutter.schedule();
> +                                       if(metaPutter instanceof 
> SingleBlockInserter)
> +                                               ((SingleBlockInserter) 
> metaPutter).encode();
> +                                       metaPutter.schedule();
> +                                       cb.onBlockSetFinished(this);
> +                               }
>                                return;
>                        }
> -               }
> -               if (fitsInOneCHK) {
> -                       // Insert single block, then insert pointer to it
> +                       // Otherwise the file is too big to fit into one block
> +                       // We therefore must make a splitfile
> +                       // Job of SplitHandler: when the splitinserter has 
> the metadata,
> +                       // insert it. Then when the splitinserter has 
> finished, and the
> +                       // metadata insert has finished too, tell the master 
> callback.
>                        if(reportMetadataOnly) {
> -                               SingleBlockInserter dataPutter = new 
> SingleBlockInserter(parent, data, codecNumber, FreenetURI.EMPTY_CHK_URI, ctx, 
> cb, metadata, (int)origSize, -1, getCHKOnly, true, true, token);
> -                               Metadata meta = makeMetadata(archiveType, 
> bestCodec, dataPutter.getURI());
> -                               cb.onMetadata(meta, this);
> -                               cb.onTransition(this, dataPutter);
> -                               dataPutter.schedule();
> -                               cb.onBlockSetFinished(this);
> +                               SplitFileInserter sfi = new 
> SplitFileInserter(parent, cb, data, bestCodec, origSize, 
> block.clientMetadata, ctx, getCHKOnly, metadata, token, archiveType, 
> shouldFreeData);
> +                               cb.onTransition(this, sfi);
> +                               sfi.start();
> +                               if(earlyEncode)
> +                                       sfi.forceEncode();
>                        } else {
> -                               MultiPutCompletionCallback mcb =
> -                                       new MultiPutCompletionCallback(cb, 
> parent, token);
> -                               SingleBlockInserter dataPutter = new 
> SingleBlockInserter(parent, data, codecNumber, FreenetURI.EMPTY_CHK_URI, ctx, 
> mcb, metadata, (int)origSize, -1, getCHKOnly, true, false, token);
> -                               Metadata meta = makeMetadata(archiveType, 
> bestCodec, dataPutter.getURI());
> -                               Bucket metadataBucket;
> -                               try {
> -                                       metadataBucket = 
> BucketTools.makeImmutableBucket(ctx.bf, meta.writeToByteArray());
> -                               } catch (IOException e) {
> -                                       Logger.error(this, "Caught "+e, e);
> -                                       throw new 
> InsertException(InsertException.BUCKET_ERROR, e, null);
> -                               } catch (MetadataUnresolvedException e) {
> -                                       // Impossible, we're not inserting a 
> manifest.
> -                                       Logger.error(this, "Caught "+e, e);
> -                                       throw new 
> InsertException(InsertException.INTERNAL_ERROR, "Got 
> MetadataUnresolvedException in SingleFileInserter: "+e.toString(), null);
> -                               }
> -                               ClientPutState metaPutter = 
> createInserter(parent, metadataBucket, (short) -1, block.desiredURI, ctx, 
> mcb, true, (int)origSize, -1, getCHKOnly, true, false);
> -                               mcb.addURIGenerator(metaPutter);
> -                               mcb.add(dataPutter);
> -                               cb.onTransition(this, mcb);
> -                               Logger.minor(this, ""+mcb+" : data 
> "+dataPutter+" meta "+metaPutter);
> -                               mcb.arm();
> -                               dataPutter.schedule();
> -                               if(metaPutter instanceof SingleBlockInserter)
> -                                       
> ((SingleBlockInserter)metaPutter).encode();
> -                               metaPutter.schedule();
> -                               cb.onBlockSetFinished(this);
> +                               SplitHandler sh = new SplitHandler();
> +                               SplitFileInserter sfi = new 
> SplitFileInserter(parent, sh, data, bestCodec, origSize, 
> block.clientMetadata, ctx, getCHKOnly, metadata, token, archiveType, 
> shouldFreeData);
> +                               sh.sfi = sfi;
> +                               cb.onTransition(this, sh);
> +                               sfi.start();
> +                               if(earlyEncode)
> +                                       sfi.forceEncode();
>                        }
> -                       return;
> +               } catch(InsertException e) {
> +                       onFailure(e, this);
>                }
> -               // Otherwise the file is too big to fit into one block
> -               // We therefore must make a splitfile
> -               // Job of SplitHandler: when the splitinserter has the 
> metadata,
> -               // insert it. Then when the splitinserter has finished, and 
> the
> -               // metadata insert has finished too, tell the master callback.
> -               if(reportMetadataOnly) {
> -                       SplitFileInserter sfi = new SplitFileInserter(parent, 
> cb, data, bestCodec, origSize, block.clientMetadata, ctx, getCHKOnly, 
> metadata, token, archiveType, shouldFreeData);
> -                       cb.onTransition(this, sfi);
> -                       sfi.start();
> -                       if(earlyEncode) sfi.forceEncode();
> -               } else {
> -                       SplitHandler sh = new SplitHandler();
> -                       SplitFileInserter sfi = new SplitFileInserter(parent, 
> sh, data, bestCodec, origSize, block.clientMetadata, ctx, getCHKOnly, 
> metadata, token, archiveType, shouldFreeData);
> -                       sh.sfi = sfi;
> -                       cb.onTransition(this, sh);
> -                       sfi.start();
> -                       if(earlyEncode) sfi.forceEncode();
> -               }
> -               } finally {
> -               COMPRESSOR_TYPE.compressorSemaphore.release();
> -               }
>        }
>
>        private Metadata makeMetadata(ARCHIVE_TYPE archiveType, 
> COMPRESSOR_TYPE codec, FreenetURI uri) {
> @@ -722,4 +706,11 @@
>        public SimpleFieldSet getProgressFieldset() {
>                return null;
>        }
> +
> +       public void onFailure(InsertException e, ClientPutState c) {
> +               if(cb != null)
> +                       cb.onFailure(e, c);
> +               else
> +                       Logger.error(this, "The callback is null but we have 
> onFailure to call!");
> +       }
>  }
>
> Added: trunk/freenet/src/freenet/support/compress/CompressJob.java
> ===================================================================
> --- trunk/freenet/src/freenet/support/compress/CompressJob.java               
>           (rev 0)
> +++ trunk/freenet/src/freenet/support/compress/CompressJob.java 2008-10-30 
> 02:20:49 UTC (rev 23211)
> @@ -0,0 +1,9 @@
> +package freenet.support.compress;
> +
> +import freenet.client.InsertException;
> +import freenet.client.async.ClientPutState;
> +
> +public interface CompressJob {
> +       public abstract void tryCompress() throws InsertException;
> +       public abstract void onFailure(InsertException e, ClientPutState c);
> +}
>
> Added: trunk/freenet/src/freenet/support/compress/RealCompressor.java
> ===================================================================
> --- trunk/freenet/src/freenet/support/compress/RealCompressor.java            
>                   (rev 0)
> +++ trunk/freenet/src/freenet/support/compress/RealCompressor.java      
> 2008-10-30 02:20:49 UTC (rev 23211)
> @@ -0,0 +1,86 @@
> +/* This code is part of Freenet. It is distributed under the GNU General
> + * Public License, version 2 (or at your option any later version). See
> + * http://www.gnu.org/ for further details of the GPL. */
> +package freenet.support.compress;
> +
> +import freenet.client.InsertException;
> +import freenet.node.PrioRunnable;
> +import freenet.support.Executor;
> +import freenet.support.Logger;
> +import freenet.support.OOMHandler;
> +import freenet.support.io.NativeThread;
> +import java.util.LinkedList;
> +
> +public class RealCompressor implements PrioRunnable {
> +
> +       private final Executor exec;
> +       private static final LinkedList<CompressJob> _awaitingJobs = new 
> LinkedList<CompressJob>();
> +
> +       public RealCompressor(Executor e) {
> +               this.exec = e;
> +       }
> +
> +       public int getPriority() {
> +               return NativeThread.HIGH_PRIORITY;
> +       }
> +
> +       public synchronized void enqueueNewJob(CompressJob j) {
> +               _awaitingJobs.add(j);
> +               notifyAll();
> +       }
> +
> +       public void run() {
> +               Logger.normal(this, "Starting RealCompressor");
> +               while(true) {
> +                       CompressJob currentJob = null;
> +                       try {
> +                               synchronized(this) {
> +                                       wait();
> +                                       currentJob = _awaitingJobs.poll();
> +                               }
> +                               if(currentJob == null)
> +                                       continue;
> +                               
> Compressor.COMPRESSOR_TYPE.compressorSemaphore.acquire();
> +                       } catch(InterruptedException e) {
> +                               Logger.error(this, "caught: "+e.getMessage(), 
> e);
> +                               continue;
> +                       }
> +
> +                       final CompressJob finalJob = currentJob;
> +                       exec.execute(new PrioRunnable() {
> +                               public void run() {
> +                                       
> freenet.support.Logger.OSThread.logPID(this);
> +                                       try {
> +                                               while(true) {
> +                                                       try {
> +                                                               
> finalJob.tryCompress();
> +                                                       } 
> catch(InsertException e) {
> +                                                               
> finalJob.onFailure(e, null);
> +                                                       } 
> catch(OutOfMemoryError e) {
> +                                                               
> OOMHandler.handleOOM(e);
> +                                                               
> System.err.println("OffThreadCompressor thread above failed.");
> +                                                               // Might not 
> be heap, so try anyway
> +                                                               
> finalJob.onFailure(new InsertException(InsertException.INTERNAL_ERROR, e, 
> null), null);
> +                                                       } catch(Throwable t) {
> +                                                               
> Logger.error(this, "Caught in OffThreadCompressor: " + t, t);
> +                                                               
> System.err.println("Caught in OffThreadCompressor: " + t);
> +                                                               
> t.printStackTrace();
> +                                                               // Try to 
> fail gracefully
> +                                                               
> finalJob.onFailure(new InsertException(InsertException.INTERNAL_ERROR, t, 
> null), null);
> +                                                       }
> +
> +                                               }
> +                                       } catch(Throwable t) {
> +                                               Logger.error(this, "Caught " 
> + t + " in " + this, t);
> +                                       } finally {
> +                                               
> Compressor.COMPRESSOR_TYPE.compressorSemaphore.release();
> +                                       }
> +                               }
> +
> +                               public int getPriority() {
> +                                       return NativeThread.MIN_PRIORITY;
> +                               }
> +                       }, "Compressor thread for " + currentJob);
> +               }
> +       }
> +}
> \ No newline at end of file
>
> _______________________________________________
> cvs mailing list
> cvs at freenetproject.org
> http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs
>

Reply via email to