On Thu, Oct 30, 2008 at 10:20 AM, <nextgens at freenetproject.org> wrote: > Author: nextgens > Date: 2008-10-30 02:20:49 +0000 (Thu, 30 Oct 2008) > New Revision: 23211 > > Added: > trunk/freenet/src/freenet/support/compress/CompressJob.java > trunk/freenet/src/freenet/support/compress/RealCompressor.java > Modified: > trunk/freenet/src/freenet/client/InsertContext.java > trunk/freenet/src/freenet/client/async/SingleFileInserter.java > Log: > More untested code on the serialized compressor. It shouldn't run out of mem > anymore
This change give these error: http://code.bulix.org/vec613-68848?raw > Modified: trunk/freenet/src/freenet/client/InsertContext.java > =================================================================== > --- trunk/freenet/src/freenet/client/InsertContext.java 2008-10-29 23:22:36 > UTC (rev 23210) > +++ trunk/freenet/src/freenet/client/InsertContext.java 2008-10-30 02:20:49 > UTC (rev 23211) > @@ -10,6 +10,7 @@ > import freenet.crypt.RandomSource; > import freenet.support.Executor; > import freenet.support.api.BucketFactory; > +import freenet.support.compress.RealCompressor; > import freenet.support.io.NullPersistentFileTracker; > import freenet.support.io.PersistentFileTracker; > > @@ -34,6 +35,7 @@ > public final USKManager uskManager; > public final BackgroundBlockEncoder backgroundBlockEncoder; > public final Executor executor; > + public final RealCompressor compressor; > > public InsertContext(BucketFactory bf, BucketFactory persistentBF, > PersistentFileTracker tracker, RandomSource random, > int maxRetries, int rnfsToSuccess, int maxThreads, int > splitfileSegmentDataBlocks, int splitfileSegmentCheckBlocks, > @@ -54,6 +56,8 @@ > this.cacheLocalRequests = cacheLocalRequests; > this.backgroundBlockEncoder = blockEncoder; > this.executor = executor; > + this.compressor = new RealCompressor(executor); > + executor.execute(compressor, "Compression scheduler"); > } > > public InsertContext(InsertContext ctx, SimpleEventProducer producer, > boolean forceNonPersistent) { > @@ -73,6 +77,7 @@ > this.cacheLocalRequests = ctx.cacheLocalRequests; > this.backgroundBlockEncoder = ctx.backgroundBlockEncoder; > this.executor = ctx.executor; > + this.compressor = ctx.compressor; > } > > public InsertContext(InsertContext ctx, SimpleEventProducer producer) { > @@ -92,6 +97,7 @@ > this.cacheLocalRequests = ctx.cacheLocalRequests; > this.backgroundBlockEncoder = ctx.backgroundBlockEncoder; > this.executor = ctx.executor; > + this.compressor = ctx.compressor; > } > > } > > Modified: trunk/freenet/src/freenet/client/async/SingleFileInserter.java > =================================================================== > --- trunk/freenet/src/freenet/client/async/SingleFileInserter.java > 2008-10-29 23:22:36 UTC (rev 23210) > +++ trunk/freenet/src/freenet/client/async/SingleFileInserter.java > 2008-10-30 02:20:49 UTC (rev 23211) > @@ -18,15 +18,14 @@ > import freenet.keys.SSKBlock; > import freenet.node.PrioRunnable; > import freenet.support.Logger; > -import freenet.support.OOMHandler; > import freenet.support.SimpleFieldSet; > import freenet.support.api.Bucket; > +import freenet.support.compress.CompressJob; > import freenet.support.compress.CompressionOutputSizeException; > import freenet.support.compress.Compressor.COMPRESSOR_TYPE; > import freenet.support.io.BucketChainBucketFactory; > import freenet.support.io.BucketTools; > import freenet.support.io.NativeThread; > -import java.util.concurrent.Semaphore; > > /** > * Attempt to insert a file. May include metadata. > @@ -35,7 +34,7 @@ > * Attempt to compress the file. Off-thread if it will take a while. > * Then hand it off to SimpleFileInserter. > */ > -class SingleFileInserter implements ClientPutState { > +public class SingleFileInserter implements ClientPutState, CompressJob { > > private static boolean logMINOR; > final BaseClientPutter parent; > @@ -106,45 +105,10 @@ > } > } > // Run off thread in any case > - OffThreadCompressor otc = new OffThreadCompressor(); > - ctx.executor.execute(otc, "Compressor for " + this); > + ctx.compressor.enqueueNewJob(this); > } > > - private class OffThreadCompressor implements PrioRunnable { > - public void run() { > - freenet.support.Logger.OSThread.logPID(this); > - try { > - tryCompress(); > - } catch (InsertException e) { > - cb.onFailure(e, SingleFileInserter.this); > - } catch (OutOfMemoryError e) { > - OOMHandler.handleOOM(e); > - System.err.println("OffThreadCompressor > thread above failed."); > - // Might not be heap, so try anyway > - cb.onFailure(new > InsertException(InsertException.INTERNAL_ERROR, e, null), > SingleFileInserter.this); > - } catch (Throwable t) { > - Logger.error(this, "Caught in OffThreadCompressor: "+t, t); > - System.err.println("Caught in OffThreadCompressor: "+t); > - t.printStackTrace(); > - // Try to fail gracefully > - cb.onFailure(new > InsertException(InsertException.INTERNAL_ERROR, t, null), > SingleFileInserter.this); > - } > - } > - > - public int getPriority() { > - return NativeThread.LOW_PRIORITY; > - } > - } > - > - private void tryCompress() throws InsertException { > - try { > - try { > - COMPRESSOR_TYPE.compressorSemaphore.acquire(); > - } catch (InterruptedException e) { > - // should not happen > - Logger.error(this, "Caught an > InterruptedException:"+e.getMessage(), e); > - throw new > InsertException(InsertException.INTERNAL_ERROR, e, null); > - } > + public void tryCompress() throws InsertException { > // First, determine how small it needs to be > Bucket origData = block.getData(); > Bucket data = origData; > @@ -224,91 +188,111 @@ > if(tryCompress) > ctx.eventProducer.produceEvent(new > FinishedCompressionEvent(bestCodec == null ? -1 : bestCodec.metadataID, > origSize, data.size())); > if(logMINOR) Logger.minor(this, "Compressed > "+origSize+" to "+data.size()+" on "+this); > - } > - > - // Compressed data > - > - // Insert it... > - short codecNumber = bestCodec == null ? -1 : > bestCodec.metadataID; > - long compressedDataSize = data.size(); > - boolean fitsInOneBlockAsIs = bestCodec == null ? > compressedDataSize < blockSize : compressedDataSize < oneBlockCompressedSize; > - boolean fitsInOneCHK = bestCodec == null ? compressedDataSize > < CHKBlock.DATA_LENGTH : compressedDataSize < > CHKBlock.MAX_COMPRESSED_DATA_LENGTH; > + } > + // Compressed data ; now insert it > + // We do it off thread so that RealCompressor can release the > semaphore > + final COMPRESSOR_TYPE fbestCodec = bestCodec; > + final Bucket fdata = data; > + final int foneBlockCompressedSize = oneBlockCompressedSize; > + final int fblockSize = blockSize; > + final long forigSize = origSize; > + final boolean fshouldFreeData = shouldFreeData; > + ctx.executor.execute(new PrioRunnable() { > > - if((fitsInOneBlockAsIs || fitsInOneCHK) && > block.getData().size() > Integer.MAX_VALUE) > - throw new > InsertException(InsertException.INTERNAL_ERROR, "2GB+ should not encode to > one block!", null); > + public int getPriority() { > + return NativeThread.NORM_PRIORITY; > + } > > - boolean noMetadata = ((block.clientMetadata == null) || > block.clientMetadata.isTrivial()) && targetFilename == null; > - if(noMetadata && archiveType == null) { > - if(fitsInOneBlockAsIs) { > - // Just insert it > - ClientPutState bi = > - createInserter(parent, data, > codecNumber, block.desiredURI, ctx, cb, metadata, > (int)block.getData().size(), -1, getCHKOnly, true, true); > - cb.onTransition(this, bi); > - bi.schedule(); > - cb.onBlockSetFinished(this); > + public void run() { > + insert(fbestCodec, fdata, > foneBlockCompressedSize, fblockSize, forigSize, fshouldFreeData); > + } > + }, "Insert thread for "+this); > + } > + > + private void insert(COMPRESSOR_TYPE bestCodec, Bucket data, int > oneBlockCompressedSize, int blockSize, long origSize, boolean shouldFreeData) > { > + try { > + // Insert it... > + short codecNumber = bestCodec == null ? -1 : > bestCodec.metadataID; > + long compressedDataSize = data.size(); > + boolean fitsInOneBlockAsIs = bestCodec == null ? > compressedDataSize < blockSize : compressedDataSize < oneBlockCompressedSize; > + boolean fitsInOneCHK = bestCodec == null ? > compressedDataSize < CHKBlock.DATA_LENGTH : compressedDataSize < > CHKBlock.MAX_COMPRESSED_DATA_LENGTH; > + > + if((fitsInOneBlockAsIs || fitsInOneCHK) && > block.getData().size() > Integer.MAX_VALUE) > + throw new > InsertException(InsertException.INTERNAL_ERROR, "2GB+ should not encode to > one block!", null); > + > + boolean noMetadata = ((block.clientMetadata == null) > || block.clientMetadata.isTrivial()) && targetFilename == null; > + if(noMetadata && archiveType == null) > + if(fitsInOneBlockAsIs) { > + // Just insert it > + ClientPutState bi = > + createInserter(parent, data, > codecNumber, block.desiredURI, ctx, cb, metadata, (int) > block.getData().size(), -1, getCHKOnly, true, true); > + cb.onTransition(this, bi); > + bi.schedule(); > + cb.onBlockSetFinished(this); > + return; > + } > + if(fitsInOneCHK) { > + // Insert single block, then insert pointer > to it > + if(reportMetadataOnly) { > + SingleBlockInserter dataPutter = new > SingleBlockInserter(parent, data, codecNumber, FreenetURI.EMPTY_CHK_URI, ctx, > cb, metadata, (int) origSize, -1, getCHKOnly, true, true, token); > + Metadata meta = > makeMetadata(archiveType, bestCodec, dataPutter.getURI()); > + cb.onMetadata(meta, this); > + cb.onTransition(this, dataPutter); > + dataPutter.schedule(); > + cb.onBlockSetFinished(this); > + } else { > + MultiPutCompletionCallback mcb = > + new > MultiPutCompletionCallback(cb, parent, token); > + SingleBlockInserter dataPutter = new > SingleBlockInserter(parent, data, codecNumber, FreenetURI.EMPTY_CHK_URI, ctx, > mcb, metadata, (int) origSize, -1, getCHKOnly, true, false, token); > + Metadata meta = > makeMetadata(archiveType, bestCodec, dataPutter.getURI()); > + Bucket metadataBucket; > + try { > + metadataBucket = > BucketTools.makeImmutableBucket(ctx.bf, meta.writeToByteArray()); > + } catch(IOException e) { > + Logger.error(this, "Caught " > + e, e); > + throw new > InsertException(InsertException.BUCKET_ERROR, e, null); > + } catch(MetadataUnresolvedException > e) { > + // Impossible, we're not > inserting a manifest. > + Logger.error(this, "Caught " > + e, e); > + throw new > InsertException(InsertException.INTERNAL_ERROR, "Got > MetadataUnresolvedException in SingleFileInserter: " + e.toString(), null); > + } > + ClientPutState metaPutter = > createInserter(parent, metadataBucket, (short) -1, block.desiredURI, ctx, > mcb, true, (int) origSize, -1, getCHKOnly, true, false); > + mcb.addURIGenerator(metaPutter); > + mcb.add(dataPutter); > + cb.onTransition(this, mcb); > + Logger.minor(this, "" + mcb + " : > data " + dataPutter + " meta " + metaPutter); > + mcb.arm(); > + dataPutter.schedule(); > + if(metaPutter instanceof > SingleBlockInserter) > + ((SingleBlockInserter) > metaPutter).encode(); > + metaPutter.schedule(); > + cb.onBlockSetFinished(this); > + } > return; > } > - } > - if (fitsInOneCHK) { > - // Insert single block, then insert pointer to it > + // Otherwise the file is too big to fit into one block > + // We therefore must make a splitfile > + // Job of SplitHandler: when the splitinserter has > the metadata, > + // insert it. Then when the splitinserter has > finished, and the > + // metadata insert has finished too, tell the master > callback. > if(reportMetadataOnly) { > - SingleBlockInserter dataPutter = new > SingleBlockInserter(parent, data, codecNumber, FreenetURI.EMPTY_CHK_URI, ctx, > cb, metadata, (int)origSize, -1, getCHKOnly, true, true, token); > - Metadata meta = makeMetadata(archiveType, > bestCodec, dataPutter.getURI()); > - cb.onMetadata(meta, this); > - cb.onTransition(this, dataPutter); > - dataPutter.schedule(); > - cb.onBlockSetFinished(this); > + SplitFileInserter sfi = new > SplitFileInserter(parent, cb, data, bestCodec, origSize, > block.clientMetadata, ctx, getCHKOnly, metadata, token, archiveType, > shouldFreeData); > + cb.onTransition(this, sfi); > + sfi.start(); > + if(earlyEncode) > + sfi.forceEncode(); > } else { > - MultiPutCompletionCallback mcb = > - new MultiPutCompletionCallback(cb, > parent, token); > - SingleBlockInserter dataPutter = new > SingleBlockInserter(parent, data, codecNumber, FreenetURI.EMPTY_CHK_URI, ctx, > mcb, metadata, (int)origSize, -1, getCHKOnly, true, false, token); > - Metadata meta = makeMetadata(archiveType, > bestCodec, dataPutter.getURI()); > - Bucket metadataBucket; > - try { > - metadataBucket = > BucketTools.makeImmutableBucket(ctx.bf, meta.writeToByteArray()); > - } catch (IOException e) { > - Logger.error(this, "Caught "+e, e); > - throw new > InsertException(InsertException.BUCKET_ERROR, e, null); > - } catch (MetadataUnresolvedException e) { > - // Impossible, we're not inserting a > manifest. > - Logger.error(this, "Caught "+e, e); > - throw new > InsertException(InsertException.INTERNAL_ERROR, "Got > MetadataUnresolvedException in SingleFileInserter: "+e.toString(), null); > - } > - ClientPutState metaPutter = > createInserter(parent, metadataBucket, (short) -1, block.desiredURI, ctx, > mcb, true, (int)origSize, -1, getCHKOnly, true, false); > - mcb.addURIGenerator(metaPutter); > - mcb.add(dataPutter); > - cb.onTransition(this, mcb); > - Logger.minor(this, ""+mcb+" : data > "+dataPutter+" meta "+metaPutter); > - mcb.arm(); > - dataPutter.schedule(); > - if(metaPutter instanceof SingleBlockInserter) > - > ((SingleBlockInserter)metaPutter).encode(); > - metaPutter.schedule(); > - cb.onBlockSetFinished(this); > + SplitHandler sh = new SplitHandler(); > + SplitFileInserter sfi = new > SplitFileInserter(parent, sh, data, bestCodec, origSize, > block.clientMetadata, ctx, getCHKOnly, metadata, token, archiveType, > shouldFreeData); > + sh.sfi = sfi; > + cb.onTransition(this, sh); > + sfi.start(); > + if(earlyEncode) > + sfi.forceEncode(); > } > - return; > + } catch(InsertException e) { > + onFailure(e, this); > } > - // Otherwise the file is too big to fit into one block > - // We therefore must make a splitfile > - // Job of SplitHandler: when the splitinserter has the > metadata, > - // insert it. Then when the splitinserter has finished, and > the > - // metadata insert has finished too, tell the master callback. > - if(reportMetadataOnly) { > - SplitFileInserter sfi = new SplitFileInserter(parent, > cb, data, bestCodec, origSize, block.clientMetadata, ctx, getCHKOnly, > metadata, token, archiveType, shouldFreeData); > - cb.onTransition(this, sfi); > - sfi.start(); > - if(earlyEncode) sfi.forceEncode(); > - } else { > - SplitHandler sh = new SplitHandler(); > - SplitFileInserter sfi = new SplitFileInserter(parent, > sh, data, bestCodec, origSize, block.clientMetadata, ctx, getCHKOnly, > metadata, token, archiveType, shouldFreeData); > - sh.sfi = sfi; > - cb.onTransition(this, sh); > - sfi.start(); > - if(earlyEncode) sfi.forceEncode(); > - } > - } finally { > - COMPRESSOR_TYPE.compressorSemaphore.release(); > - } > } > > private Metadata makeMetadata(ARCHIVE_TYPE archiveType, > COMPRESSOR_TYPE codec, FreenetURI uri) { > @@ -722,4 +706,11 @@ > public SimpleFieldSet getProgressFieldset() { > return null; > } > + > + public void onFailure(InsertException e, ClientPutState c) { > + if(cb != null) > + cb.onFailure(e, c); > + else > + Logger.error(this, "The callback is null but we have > onFailure to call!"); > + } > } > > Added: trunk/freenet/src/freenet/support/compress/CompressJob.java > =================================================================== > --- trunk/freenet/src/freenet/support/compress/CompressJob.java > (rev 0) > +++ trunk/freenet/src/freenet/support/compress/CompressJob.java 2008-10-30 > 02:20:49 UTC (rev 23211) > @@ -0,0 +1,9 @@ > +package freenet.support.compress; > + > +import freenet.client.InsertException; > +import freenet.client.async.ClientPutState; > + > +public interface CompressJob { > + public abstract void tryCompress() throws InsertException; > + public abstract void onFailure(InsertException e, ClientPutState c); > +} > > Added: trunk/freenet/src/freenet/support/compress/RealCompressor.java > =================================================================== > --- trunk/freenet/src/freenet/support/compress/RealCompressor.java > (rev 0) > +++ trunk/freenet/src/freenet/support/compress/RealCompressor.java > 2008-10-30 02:20:49 UTC (rev 23211) > @@ -0,0 +1,86 @@ > +/* This code is part of Freenet. It is distributed under the GNU General > + * Public License, version 2 (or at your option any later version). See > + * http://www.gnu.org/ for further details of the GPL. */ > +package freenet.support.compress; > + > +import freenet.client.InsertException; > +import freenet.node.PrioRunnable; > +import freenet.support.Executor; > +import freenet.support.Logger; > +import freenet.support.OOMHandler; > +import freenet.support.io.NativeThread; > +import java.util.LinkedList; > + > +public class RealCompressor implements PrioRunnable { > + > + private final Executor exec; > + private static final LinkedList<CompressJob> _awaitingJobs = new > LinkedList<CompressJob>(); > + > + public RealCompressor(Executor e) { > + this.exec = e; > + } > + > + public int getPriority() { > + return NativeThread.HIGH_PRIORITY; > + } > + > + public synchronized void enqueueNewJob(CompressJob j) { > + _awaitingJobs.add(j); > + notifyAll(); > + } > + > + public void run() { > + Logger.normal(this, "Starting RealCompressor"); > + while(true) { > + CompressJob currentJob = null; > + try { > + synchronized(this) { > + wait(); > + currentJob = _awaitingJobs.poll(); > + } > + if(currentJob == null) > + continue; > + > Compressor.COMPRESSOR_TYPE.compressorSemaphore.acquire(); > + } catch(InterruptedException e) { > + Logger.error(this, "caught: "+e.getMessage(), > e); > + continue; > + } > + > + final CompressJob finalJob = currentJob; > + exec.execute(new PrioRunnable() { > + public void run() { > + > freenet.support.Logger.OSThread.logPID(this); > + try { > + while(true) { > + try { > + > finalJob.tryCompress(); > + } > catch(InsertException e) { > + > finalJob.onFailure(e, null); > + } > catch(OutOfMemoryError e) { > + > OOMHandler.handleOOM(e); > + > System.err.println("OffThreadCompressor thread above failed."); > + // Might not > be heap, so try anyway > + > finalJob.onFailure(new InsertException(InsertException.INTERNAL_ERROR, e, > null), null); > + } catch(Throwable t) { > + > Logger.error(this, "Caught in OffThreadCompressor: " + t, t); > + > System.err.println("Caught in OffThreadCompressor: " + t); > + > t.printStackTrace(); > + // Try to > fail gracefully > + > finalJob.onFailure(new InsertException(InsertException.INTERNAL_ERROR, t, > null), null); > + } > + > + } > + } catch(Throwable t) { > + Logger.error(this, "Caught " > + t + " in " + this, t); > + } finally { > + > Compressor.COMPRESSOR_TYPE.compressorSemaphore.release(); > + } > + } > + > + public int getPriority() { > + return NativeThread.MIN_PRIORITY; > + } > + }, "Compressor thread for " + currentJob); > + } > + } > +} > \ No newline at end of file > > _______________________________________________ > cvs mailing list > cvs at freenetproject.org > http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs >
