On Tuesday 26 August 2008 00:06, nextgens at freenetproject.org wrote:
> Author: nextgens
> Date: 2008-08-25 23:06:22 +0000 (Mon, 25 Aug 2008)
> New Revision: 22164
>
> Modified:
> trunk/freenet/src/freenet/support/io/TempBucketFactory.java
> Log:
> TempBucket: simplify the code even more... but it's still not working as
expected!
>
> Modified: trunk/freenet/src/freenet/support/io/TempBucketFactory.java
> ===================================================================
> --- trunk/freenet/src/freenet/support/io/TempBucketFactory.java
> 2008-08-25
22:45:37 UTC (rev 22163)
> +++ trunk/freenet/src/freenet/support/io/TempBucketFactory.java
> 2008-08-25
23:06:22 UTC (rev 22164)
> @@ -65,8 +65,8 @@
> private long currentSize;
> private OutputStream os = null;
> private short osIndex;
> - private volatile boolean shouldResetOS = false;
> - private volatile boolean shouldResetIS = false;
> + private short shouldResetIS = 0;
> + private short isIndex;
> public final Object sync = new Object();
> public final long creationTime;
>
> @@ -76,6 +76,7 @@
> this.currentBucket = cur;
> this.creationTime = now;
> this.osIndex = 0;
> + this.isIndex = 0;
> }
>
> /** A blocking method to force-migrate from a RAMBucket to a
> FileBucket
*/
> @@ -88,14 +89,17 @@
>
> toMigrate = currentBucket;
> Bucket tempFB = _makeFileBucket();
> + if(os != null) {
> + os.flush();
> + os.close();
> + }
> os = tempFB.getOutputStream();
> BucketTools.copyTo(tempFB, os, currentSize);
> if(toMigrate.isReadOnly())
> tempFB.setReadOnly();
> currentBucket = tempFB;
> // We need streams to be reset to point to the
> new bucket
> - shouldResetOS = true;
> - shouldResetIS = true;
> + shouldResetIS = isIndex;
> }
> if(logMINOR)
> Logger.minor(this, "We have migrated
> "+toMigrate.hashCode());
> @@ -114,23 +118,21 @@
>
> public OutputStream getOutputStream() throws IOException {
> synchronized(sync) {
> - shouldResetOS = true;
> + if(os != null)
> + throw new IOException("Only one
> OutputStream per bucket!");
> return new TempBucketOutputStream(++osIndex);
> }
> }
>
> private class TempBucketOutputStream extends OutputStream {
> - private OutputStream currentOS;
> private final short idx;
>
> - TempBucketOutputStream(short idx) {
> + TempBucketOutputStream(short idx) throws IOException {
> this.idx = idx;
> + os = currentBucket.getOutputStream();
> }
>
> private void _maybeMigrateRamBucket(long futureSize)
> throws IOException
{
> - if(idx != osIndex)
> - throw new IOException("Should use the
> new OutputStream!");
> -
> if(isRAMBucket()) {
> boolean shouldMigrate = false;
> boolean isOversized = false;
> @@ -142,7 +144,6 @@
> shouldMigrate = true;
>
> if(shouldMigrate) {
> - Closer.close(currentOS);
> if(logMINOR) {
> if(isOversized)
>
> Logger.minor(this, "The bucket is
over "+SizeUtil.formatSize(maxRAMBucketSize*RAMBUCKET_CONVERSION_FACTOR)+":
we will force-migrate it to disk.");
> @@ -154,22 +155,13 @@
> }
> }
>
> - private void _maybeResetOutputStream() throws
> IOException {
> - if(shouldResetOS) {
> - Closer.close(currentOS);
> - currentOS = (os == null ?
> currentBucket.getOutputStream() : os);
> - shouldResetOS = false;
> - }
> - }
> -
> @Override
> public final void write(int b) throws IOException {
> synchronized(sync) {
> - long futurSize = currentSize + 1;
> - _maybeMigrateRamBucket(futurSize);
> - _maybeResetOutputStream();
> - currentOS.write(b);
> - currentSize = futurSize;
> + long futureSize = currentSize + 1;
> + _maybeMigrateRamBucket(futureSize);
> + os.write(b);
> + currentSize = futureSize;
> if(isRAMBucket()) // We need to
> re-check because it might have
changed!
> _hasTaken(1);
> }
> @@ -180,8 +172,7 @@
> synchronized(sync) {
> long futureSize = currentSize + len;
> _maybeMigrateRamBucket(futureSize);
> - _maybeResetOutputStream();
> - currentOS.write(b, off, len);
> + os.write(b, off, len);
> currentSize = futureSize;
> if(isRAMBucket()) // We need to
> re-check because it might have
changed!
> _hasTaken(len);
> @@ -192,8 +183,7 @@
> public final void flush() throws IOException {
> synchronized(sync) {
> _maybeMigrateRamBucket(currentSize);
> - _maybeResetOutputStream();
> - currentOS.flush();
> + os.flush();
> }
> }
>
> @@ -201,17 +191,16 @@
> public final void close() throws IOException {
> synchronized(sync) {
> _maybeMigrateRamBucket(currentSize);
> - _maybeResetOutputStream();
> - if(currentOS != null) {
> - currentOS.flush();
> - Closer.close(currentOS);
> + if(os != null) {
> + os.flush();
> + os.close();
> }
> }
> }
> }
>
> public synchronized InputStream getInputStream() throws
> IOException {
> - shouldResetIS = true;
> + isIndex++;
> return new TempBucketInputStream(osIndex);
> }
>
> @@ -220,19 +209,20 @@
> private long index = 0;
> private final short idx;
>
> - TempBucketInputStream(short idx) {
> + TempBucketInputStream(short idx) throws IOException {
> this.idx = idx;
> + this.currentIS = currentBucket.getInputStream();
> }
>
> private void _maybeResetInputStream() throws
> IOException {
> if(idx != osIndex)
> throw new IOException("Should use the
> new InputStream!");
>
> - if(shouldResetIS) {
> + if(shouldResetIS > 0) {
> Closer.close(currentIS);
> currentIS =
> currentBucket.getInputStream();
> currentIS.skip(index);
> - shouldResetIS = false;
> + shouldResetIS--;
> }
I don't understand this one. Looks like you're going to reset and seek on
every call to read() until shouldResetIS reaches 0?
You want every input stream currently running to be reset, not one to be reset
N times. Probably the best way to implement this is to have a flag on each
InputStream for whether it's been reset or not?? When the global flag is set,
_maybeResetInputStream() will then reset *once* for each stream, regardless
of whether they are called in a round-robin order or not??
> }
>
> @@ -283,6 +273,7 @@
> synchronized(sync) {
> _maybeResetInputStream();
> Closer.close(currentIS);
> + isIndex--;
Not sure what's going on here either.
> }
> }
> }
-------------- next part --------------
A non-text attachment was scrubbed...
Name: not available
Type: application/pgp-signature
Size: 827 bytes
Desc: not available
URL:
<https://emu.freenetproject.org/pipermail/devl/attachments/20080830/fed1e426/attachment.pgp>