Uhm, membering we had some problems with nio some time (years) ago; what
are the reasons to switch from io to nio again?
j16sdiz at freenetproject.org wrote:
> Author: j16sdiz
> Date: 2008-04-26 03:57:34 +0000 (Sat, 26 Apr 2008)
> New Revision: 19567
>
> Modified:
> trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java
> Log:
> use FileChannel.read()/write() in BDBFS
>
>
> Modified: trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java
> ===================================================================
> --- trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java
> 2008-04-25 23:18:06 UTC (rev 19566)
> +++ trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java
> 2008-04-26 03:57:34 UTC (rev 19567)
> @@ -4,6 +4,8 @@
> import java.io.File;
> import java.io.IOException;
> import java.io.RandomAccessFile;
> +import java.nio.ByteBuffer;
> +import java.nio.channels.FileChannel;
> import java.util.ArrayList;
> import java.util.Arrays;
> import java.util.List;
> @@ -79,6 +81,9 @@
> private RandomAccessFile storeRAF;
> private RandomAccessFile keysRAF;
> private RandomAccessFile lruRAF;
> + private FileChannel storeFC;
> + private FileChannel keysFC;
> + private FileChannel lruFC;
> private final SortedLongSet freeBlocks;
> private final String name;
> /** Callback which translates records to blocks and back, specifies the
> size of blocks etc. */
> @@ -252,17 +257,20 @@
> if(!storeFile.createNewFile())
> throw new IOException("Can't create a
> new file " + storeFile + " !");
> storeRAF = new RandomAccessFile(storeFile,"rw");
> + storeFC = storeRAF.getChannel();
>
> if(!lruFile.exists())
> if(!lruFile.createNewFile())
> throw new IOException("Can't create a
> new file " + lruFile + " !");
> lruRAF = new RandomAccessFile(lruFile,"rw");
> + lruFC = lruRAF.getChannel();
>
> if(keysFile != null) {
> if(!keysFile.exists())
> if(!keysFile.createNewFile())
> throw new IOException("Can't
> create a new file " + keysFile + " !");
> keysRAF = new RandomAccessFile(keysFile,"rw");
> + keysFC = keysRAF.getChannel();
> } else keysRAF = null;
>
> if (wipe) {
> @@ -569,7 +577,7 @@
>
> WrapperManager.signalStarting((int)Math.min(Integer.MAX_VALUE,
> (5*60*1000 + wantedMoveNums.length*1000L + alreadyDropped.size() * 100L)));
> // 1 per second
>
> - byte[] buf = new byte[headerBlockSize + dataBlockSize];
> + ByteBuffer buf = ByteBuffer.allocate(headerBlockSize +
> dataBlockSize);
> long lruValue;
> byte[] keyBuf = new byte[keyLength];
> t = null;
> @@ -622,18 +630,21 @@
> boolean readLRU = false;
> boolean readKey = false;
> try {
> - storeRAF.seek(entry * (headerBlockSize
> + dataBlockSize));
> - storeRAF.readFully(buf);
> + buf.rewind();
> + do {
> + int byteRead =
> storeFC.read(buf, entry * (headerBlockSize + dataBlockSize) + buf.position());
> + if (byteRead == -1)
> + throw new
> EOFException();
> + } while (buf.hasRemaining());
> + buf.flip();
> lruValue = 0;
> if(lruRAF.length() > ((entry + 1) * 8))
> {
> readLRU = true;
> - lruRAF.seek(entry * 8);
> - lruValue = lruRAF.readLong();
> + lruValue = fcReadLRU(entry);
> }
> if(keysRAF != null && keysRAF.length()
> > ((entry + 1) * keyLength)) {
> readKey = true;
> - keysRAF.seek(entry * keyLength);
> - keysRAF.readFully(keyBuf);
> + fcReadKey(entry, keyBuf);
> }
> } catch (EOFException e) {
> System.err.println("Was reading
> "+wantedBlock+" to write to "+unwantedBlock);
> @@ -642,15 +653,16 @@
> throw e;
> }
> entry = unwantedBlock.longValue();
> - storeRAF.seek(entry * (headerBlockSize +
> dataBlockSize));
> - storeRAF.write(buf);
> + do {
> + int byteWritten = storeFC.write(buf,
> entry * (headerBlockSize + dataBlockSize) + buf.position());
> + if (byteWritten == -1)
> + throw new EOFException();
> + } while (buf.hasRemaining());
> if(readLRU) {
> - lruRAF.seek(entry * 8);
> - lruRAF.writeLong(lruValue);
> + fcWriteLRU(entry, lruValue);
> }
> if(readKey) {
> - keysRAF.seek(entry * keyLength);
> - keysRAF.write(keyBuf);
> + fcWriteKey(entry, keyBuf);
> }
>
> // Update the database w.r.t. the old block.
> @@ -1158,11 +1170,7 @@
> byte[] header = new byte[headerBlockSize];
> byte[] data = new byte[dataBlockSize];
> try {
> - synchronized(storeRAF) {
> -
> storeRAF.seek(storeBlock.offset*(long)(dataBlockSize+headerBlockSize));
> - storeRAF.readFully(header);
> - storeRAF.readFully(data);
> - }
> + fcReadStore(storeBlock.offset, header,
> data);
> } catch (EOFException e) {
> Logger.error(this, "No block");
> c.close();
> @@ -1201,16 +1209,15 @@
>
> keysDB.put(t,routingkeyDBE,blockDBE);
> if(fullKey == null)
> fullKey =
> block.getFullKey();
> - synchronized(storeRAF) {
> +
> if(keysRAF != null) {
> -
> keysRAF.seek(storeBlock.offset * keyLength);
> -
> keysRAF.write(fullKey);
> +
> fcWriteKey(storeBlock.offset, fullKey);
> if(logDEBUG)
>
> Logger.debug(this, "Written full key length "+fullKey.length+" to block
> "+storeBlock.offset+" at "+(storeBlock.offset * keyLength)+" for "+callback);
> } else if(logDEBUG) {
>
> Logger.debug(this, "Not writing full key length "+fullKey.length+" for block
> "+storeBlock.offset+" for "+callback);
> }
> - }
> +
> } catch (DatabaseException e) {
> Logger.error(this, "Caught
> database exception "+e+" while replacing element");
> addFreeBlock(storeBlock.offset,
> true, "Bogus key");
> @@ -1237,10 +1244,7 @@
> c = null;
> t.commit();
> t = null;
> - synchronized(storeRAF) {
> - lruRAF.seek(storeBlock.offset *
> 8);
> -
> lruRAF.writeLong(storeBlock.recentlyUsed);
> - }
> + fcWriteLRU(storeBlock.offset,
> storeBlock.recentlyUsed);
> } else {
> c.close();
> c = null;
> @@ -1258,16 +1262,14 @@
> synchronized(this) {
> misses++;
> }
> - synchronized(storeRAF) {
> +
> // Clear the key in the keys file.
> byte[] buf = new byte[keyLength];
> for(int i=0;i<buf.length;i++) buf[i] =
> 0; // FIXME unnecessary?
> if(keysRAF != null) {
> - keysRAF.seek(storeBlock.offset
> * keyLength);
> - keysRAF.write(buf);
> + fcWriteKey(storeBlock.offset,
> buf);
> }
> - }
> -
> +
> keysDB.delete(t, routingkeyDBE);
>
> // Insert the block into the index with a
> random key, so that it's part of the LRU.
> @@ -1394,14 +1396,9 @@
>
> StoreBlock storeBlock = (StoreBlock)
> storeBlockTupleBinding.entryToObject(blockDBE);
>
> - synchronized(storeRAF) {
> -
> storeRAF.seek(storeBlock.offset*(long)(dataBlockSize+headerBlockSize));
> - storeRAF.write(header);
> - storeRAF.write(data);
> - if(keysRAF != null) {
> - keysRAF.seek(storeBlock.offset *
> keyLength);
> - keysRAF.write(fullKey);
> - }
> + fcWriteStore(storeBlock.offset, header, data);
> + if (keysRAF != null) {
> + fcWriteKey(storeBlock.offset, fullKey);
> }
>
> // Unlock record.
> @@ -1522,22 +1519,17 @@
> DatabaseEntry blockDBE = new DatabaseEntry();
> storeBlockTupleBinding.objectToEntry(storeBlock, blockDBE);
> keysDB.put(t,routingkeyDBE,blockDBE);
> - synchronized(storeRAF) {
> -
> storeRAF.seek(storeBlock.getOffset()*(long)(dataBlockSize+headerBlockSize));
> - storeRAF.write(header);
> - storeRAF.write(data);
> - lruRAF.seek(storeBlock.getOffset() * 8);
> - lruRAF.writeLong(storeBlock.recentlyUsed);
> - if(keysRAF != null) {
> - keysRAF.seek(storeBlock.getOffset() *
> keyLength);
> - keysRAF.write(fullKey);
> - }
> +
> + fcWriteStore(storeBlock.getOffset(), header, data);
> + fcWriteLRU( storeBlock.getOffset(),storeBlock.recentlyUsed);
> + if (keysRAF != null)
> + fcWriteKey(storeBlock.getOffset(), fullKey);
> + synchronized (this) {
> writes++;
> }
> }
>
> private boolean writeNewBlock(long blockNum, byte[] header, byte[]
> data, Transaction t, DatabaseEntry routingkeyDBE, byte[] fullKey) throws
> DatabaseException, IOException {
> - long byteOffset = blockNum*(dataBlockSize+headerBlockSize);
> StoreBlock storeBlock = new StoreBlock(this, blockNum);
> long lruValue = storeBlock.recentlyUsed;
> DatabaseEntry blockDBE = new DatabaseEntry();
> @@ -1562,31 +1554,20 @@
> throw e;
> }
> }
> - synchronized(storeRAF) {
> - try {
> - storeRAF.seek(byteOffset);
> - } catch (IOException ioe) {
> - if(byteOffset > (2l*1024*1024*1024)) {
> - Logger.error(this, "Environment does
> not support files bigger than 2 GB?");
> - System.out.println("Environment does
> not support files bigger than 2 GB? (exception to follow)");
> - }
> - Logger.error(this, "Caught IOException on
> storeRAF.seek("+byteOffset+ ')');
> - throw ioe;
> - }
> - storeRAF.write(header);
> - storeRAF.write(data);
> - lruRAF.seek(blockNum * 8);
> - lruRAF.writeLong(lruValue);
> +
> + fcWriteStore(blockNum, header, data);
> + fcWriteLRU(blockNum, lruValue);
> if(keysRAF != null) {
> - keysRAF.seek(blockNum * keyLength);
> - keysRAF.write(fullKey);
> + fcWriteKey(blockNum, fullKey);
> if(logDEBUG)
> Logger.debug(this, "Written full key
> length "+fullKey.length+" to block "+blockNum+" at "+(blockNum * keyLength)+"
> for "+callback);
> } else if(logDEBUG) {
> Logger.debug(this, "Not writing full key length
> "+fullKey.length+" for block "+blockNum+" for "+callback);
> }
> + synchronized (this) {
> writes++;
> }
> +
> return true;
> }
>
> @@ -1802,7 +1783,7 @@
> private static void flushAndCloseRAF(RandomAccessFile file) {
> try {
> if (file != null)
> - file.getFD().sync();
> + file.getChannel().force(true);
> } catch (IOException e) {
> // ignore
> }
> @@ -2144,6 +2125,67 @@
> return db;
> }
>
> + private void fcWriteLRU(long entry, long data) throws IOException {
> + ByteBuffer bf = ByteBuffer.allocateDirect(8);
> + bf.putLong(data);
> + bf.flip();
> + do {
> + int byteWritten = lruFC.write(bf, entry * 8 +
> bf.position());
> + if (byteWritten == -1)
> + throw new EOFException();
> + } while (bf.hasRemaining());
> + }
> + private long fcReadLRU(long entry) throws IOException {
> + ByteBuffer bf = ByteBuffer.allocateDirect(8);
> + do {
> + int byteRead = lruFC.read(bf, entry * 8 +
> bf.position());
> + if (byteRead == -1)
> + throw new EOFException();
> + } while (bf.hasRemaining());
> + bf.flip();
> + return bf.getLong();
> + }
> + private void fcReadKey(long entry, byte[] data) throws IOException {
> + ByteBuffer bf = ByteBuffer.wrap(data);
> + do {
> + int byteRead = keysFC.read(bf, entry * keyLength +
> bf.position());
> + if (byteRead == -1)
> + throw new EOFException();
> + } while (bf.hasRemaining());
> + }
> + private void fcWriteKey(long entry, byte[] data) throws IOException {
> + ByteBuffer bf = ByteBuffer.wrap(data);
> + do {
> + int byteWritten = keysFC.write(bf, entry * keyLength +
> bf.position());
> + if (byteWritten == -1)
> + throw new EOFException();
> + } while (bf.hasRemaining());
> + }
> + private void fcWriteStore(long entry, byte[] header, byte[] data)
> throws IOException {
> + ByteBuffer bf = ByteBuffer.allocateDirect(headerBlockSize +
> dataBlockSize);
> + bf.put(header);
> + bf.put(data);
> + bf.flip();
> + do {
> + int byteWritten = storeFC.write(bf, (headerBlockSize +
> dataBlockSize) * entry + bf.position());
> + if (byteWritten == -1)
> + throw new EOFException();
> + } while (bf.hasRemaining());
> + }
> + private void fcReadStore(long entry,byte[] header, byte[] data ) throws
> IOException {
> + ByteBuffer bf = ByteBuffer.allocateDirect(headerBlockSize +
> dataBlockSize);
> +
> + do {
> + int dataRead = storeFC.read(bf, (headerBlockSize +
> dataBlockSize) * entry);
> + if (dataRead == -1)
> + throw new EOFException();
> + } while (bf.hasRemaining());
> +
> + bf.flip();
> + bf.get(header);
> + bf.get(data);
> + }
> +
> public void handleOOM() throws Exception {
> if (storeRAF != null)
> storeRAF.getFD().sync();
>
> _______________________________________________
> cvs mailing list
> cvs at freenetproject.org
> http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs
>
>