Author: chirino
Date: Thu Jan 19 00:59:10 2012
New Revision: 1233145
URL: http://svn.apache.org/viewvc?rev=1233145&view=rev
Log:
Fixes APLO-128 : Simplify the leveldb-store's log record format so that every
record has a checksum which can be used to quickly validate all read data in
case your paranoid about data corruption in your FS.
Modified:
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreDTO.java
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
Modified:
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreDTO.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreDTO.java?rev=1233145&r1=1233144&r2=1233145&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreDTO.java
(original)
+++
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreDTO.java
Thu Jan 19 00:59:10 2012
@@ -49,9 +49,6 @@ public class LevelDBStoreDTO extends Sto
@XmlAttribute(name="log_size")
public String log_size;
- @XmlAttribute(name="log__write_buffer_size")
- public String log_write_buffer_size;
-
@XmlAttribute(name="index_max_open_files")
public Integer index_max_open_files;
@@ -96,8 +93,6 @@ public class LevelDBStoreDTO extends Sto
return false;
if (index_factory != null ? !index_factory.equals(that.index_factory)
: that.index_factory != null) return false;
if (log_size != null ? !log_size.equals(that.log_size) : that.log_size
!= null) return false;
- if (log_write_buffer_size != null ?
!log_write_buffer_size.equals(that.log_write_buffer_size) :
that.log_write_buffer_size != null)
- return false;
if (paranoid_checks != null ?
!paranoid_checks.equals(that.paranoid_checks) : that.paranoid_checks != null)
return false;
if (read_threads != null ? !read_threads.equals(that.read_threads) :
that.read_threads != null) return false;
@@ -118,7 +113,6 @@ public class LevelDBStoreDTO extends Sto
result = 31 * result + (paranoid_checks != null ?
paranoid_checks.hashCode() : 0);
result = 31 * result + (verify_checksums != null ?
verify_checksums.hashCode() : 0);
result = 31 * result + (log_size != null ? log_size.hashCode() : 0);
- result = 31 * result + (log_write_buffer_size != null ?
log_write_buffer_size.hashCode() : 0);
result = 31 * result + (index_max_open_files != null ?
index_max_open_files.hashCode() : 0);
result = 31 * result + (index_block_restart_interval != null ?
index_block_restart_interval.hashCode() : 0);
result = 31 * result + (index_write_buffer_size != null ?
index_write_buffer_size.hashCode() : 0);
Modified:
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala?rev=1233145&r1=1233144&r2=1233145&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
Thu Jan 19 00:59:10 2012
@@ -172,6 +172,9 @@ class LevelDBClient(store: LevelDBStore)
import FileSupport._
def dispatchQueue = store.dispatch_queue
+
+ implicit def toByteArray(buf:Buffer):Array[Byte] = buf.toByteArray
+ implicit def toBuffer(buf:Array[Byte]):Buffer = new Buffer(buf)
/////////////////////////////////////////////////////////////////////
//
@@ -240,7 +243,7 @@ class LevelDBClient(store: LevelDBStore)
config.index_max_open_files.foreach( index_options.maxOpenFiles(_) )
config.index_block_restart_interval.foreach(
index_options.blockRestartInterval(_) )
- config.paranoid_checks.foreach( index_options.paranoidChecks(_) )
+ index_options.paranoidChecks(config.paranoid_checks.getOrElse(true))
Option(config.index_write_buffer_size).map(MemoryPropertyEditor.parse(_).toInt).foreach(
index_options.writeBufferSize(_) )
Option(config.index_block_size).map(MemoryPropertyEditor.parse(_).toInt).foreach(
index_options.blockSize(_) )
Option(config.index_compression).foreach(x =>
index_options.compressionType( x match {
@@ -255,8 +258,9 @@ class LevelDBClient(store: LevelDBStore)
})
log = create_log
- log.write_buffer_size =
Option(config.log_write_buffer_size).map(MemoryPropertyEditor.parse(_).toInt).getOrElse(1024*1024*4)
- log.log_size = log_size
+ log.sync = sync
+ log.logSize = log_size
+ log.paranoidChecks = index_options.paranoidChecks()
log.on_log_rotate = ()=> {
// lets queue a request to checkpoint when
// the logs rotate.. queue it on the GC thread since GC's lock
@@ -660,7 +664,7 @@ class LevelDBClient(store: LevelDBStore)
}
if( sync_needed && sync ) {
appender.flush
- appender.sync
+ appender.force
}
}
}
@@ -711,6 +715,7 @@ class LevelDBClient(store: LevelDBStore)
log.read(pos, len).map { data =>
val rc:MessageRecord = data
rc.locator = new AtomicReference[Array[Byte]](locator_data)
+ assert( rc.protocol!=null )
rc
}
}
@@ -741,6 +746,7 @@ class LevelDBClient(store: LevelDBStore)
log.read(pos, len).map { data =>
val rc:MessageRecord = data
rc.locator = new AtomicReference[Array[Byte]](locator_data)
+ assert( rc.protocol!=null )
rc
}
}
Modified:
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala?rev=1233145&r1=1233144&r2=1233145&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
Thu Jan 19 00:59:10 2012
@@ -19,53 +19,41 @@ package org.apache.activemq.apollo.broke
import java.{lang=>jl}
import java.{util=>ju}
-import org.apache.activemq.apollo.util._
-import org.fusesource.hawtbuf.DataByteArrayOutputStream
-import java.io._
import java.util.zip.CRC32
import java.util.Map.Entry
-import java.util.Arrays
-import collection.mutable.{HashMap, HashSet}
import collection.immutable.TreeMap
+import org.fusesource.hawtdispatch.BaseRetained
import java.util.concurrent.atomic.AtomicLong
-import java.nio.ByteBuffer
+import java.io._
+import org.apache.activemq.apollo.util.FileSupport._
+import org.apache.activemq.apollo.util.{Log, LRUCache}
+import org.fusesource.hawtbuf.{DataByteArrayInputStream,
DataByteArrayOutputStream, Buffer}
-object RecordLog {
+object RecordLog extends Log {
// The log files contain a sequence of variable length log records:
- // record :=
- // '*L' : int8*2 // 2 byte constant
+ // record := header + data
+ //
+ // header :=
+ // '*' : int8 // Start of Record Magic
+ // kind : int8 // Help identify content type of the data.
// checksum : uint32 // crc32c of the data[]
// length : uint32 // the length the the data
- // data : int8*length
- //
- // The log records are used to aggregate multiple data records
- // as a single write to the file system.
-
- //
- // The data is composed of multiple records too:
- // data :=
- // kind : int8
- // length : varInt
- // body : int8*length
- //
- // The kind field is an aid to the app layer. It cannot be set to
- // '*'.
-
- val LOG_HEADER_PREFIX = Array('*', 'L').map(_.toByte)
- val LOG_HEADER_SIZE = 10 // BATCH_HEADER_PREFIX (2) + checksum (4) + length
(4)
+ val LOG_HEADER_PREFIX = '*'.toByte
+ val LOG_HEADER_SIZE = 10
+ val BUFFER_SIZE = 1024
}
-case class RecordLog(directory: File, log_suffix:String) {
- import FileSupport._
+case class RecordLog(directory: File, logSuffix:String) {
import RecordLog._
directory.mkdirs()
- var write_buffer_size = 1024 * 1024 * 4
- var log_size = 1024 * 1024 * 100
- private var current_appender:LogAppender = _
+ var logSize = 1024 * 1024 * 100
+ var current_appender:LogAppender = _
+ var paranoidChecks = false
+ var sync = false
case class LogInfo(file:File, position:Long, length:AtomicLong) {
def limit = position+length.get
@@ -79,165 +67,241 @@ case class RecordLog(directory: File, lo
// We can't delete the current appender.
if( current_appender.start != id ) {
log_infos.get(id).foreach { info =>
- on_delete(info.file)
+ onDelete(info.file)
log_infos = log_infos.filterNot(_._1 == id)
}
}
}
}
- protected def on_delete(file:File) = {
+ protected def onDelete(file:File) = {
file.delete()
}
- class LogAppender(val file:File, val start:Long) {
+ def checksum(data: Buffer): Int = {
+ val checksum = new CRC32
+ checksum.update(data.data, data.offset, data.length)
+ (checksum.getValue & 0xFFFFFFFF).toInt
+ }
- val fos = new FileOutputStream(file)
- def channel = fos.getChannel
- def os:OutputStream = fos
+ class LogAppender(file:File, start:Long) extends LogReader(file, start) {
- val outbound = new DataByteArrayOutputStream()
+ override def open = new RandomAccessFile(file, "rw")
+
+ override def dispose() = {
+ force
+ super.dispose()
+ }
- var batch_length = 0
val length = new AtomicLong(0)
- var limit = start
+
+ def limit = start+length.get()
// set the file size ahead of time so that we don't have to sync the file
// meta-data on every log sync.
- channel.position(log_size)
- channel.write(ByteBuffer.wrap(Array(0.toByte)))
+ channel.position(logSize-1)
+ channel.write(new Buffer(1).toByteBuffer)
channel.force(true)
channel.position(0)
- def sync = {
+ val os = new DataByteArrayOutputStream((BUFFER_SIZE)+LOG_HEADER_PREFIX)
+
+ def force = {
// only need to update the file metadata if the file size changes..
- channel.force(length.get() > log_size)
+ flush
+ if(sync) {
+ channel.force(length.get() > logSize)
+ }
}
- def flush {
- if( batch_length!= 0 ) {
-
- // Update the buffer with the log header info now that we
- // can calc the length and checksum info
- val buffer = outbound.toBuffer
-
- assert(buffer.length()==LOG_HEADER_SIZE+batch_length)
-
- outbound.reset()
- outbound.write(LOG_HEADER_PREFIX)
-
- val checksum = new CRC32
- checksum.update(buffer.data, buffer.offset + LOG_HEADER_SIZE,
buffer.length - LOG_HEADER_SIZE)
- var actual_checksum = (checksum.getValue & 0xFFFFFFFF).toInt
-
- outbound.writeInt( actual_checksum )
- outbound.writeInt(batch_length)
-
- // Actually write the record to the file..
- buffer.writeTo(os);
-
- length.addAndGet( buffer.length() )
-
- batch_length = 0
- outbound.reset()
+ def flush = {
+ if( os.position() > 0 ) {
+ val buffer = os.toBuffer.toByteBuffer
+ val pos = length.get()-buffer.remaining
+ trace("wrote at "+pos+" "+os.toBuffer)
+ channel.write(buffer, pos)
+ if( buffer.hasRemaining ) {
+ throw new IOException("Short write")
+ }
+ os.reset()
}
}
/**
* returns the offset position of the data record.
*/
- def append(id:Byte, data: Array[Byte]): Long = {
- assert(id != LOG_HEADER_PREFIX(0))
- if( batch_length!=0 && (batch_length + data.length > write_buffer_size)
) {
+ def append(id:Byte, data: Buffer): Long = {
+ val rc = limit
+ val data_length = data.length
+ val total_length = LOG_HEADER_SIZE + data_length
+
+ if( os.position() + total_length > BUFFER_SIZE ) {
flush
}
- if( batch_length==0 ) {
- // first data pos record is offset by the log header.
- outbound.skip(LOG_HEADER_SIZE);
- limit += LOG_HEADER_SIZE
- }
- val rc = limit;
-
- val start = outbound.position
- outbound.writeByte(id);
- outbound.writeInt(data.length)
- outbound.write(data);
- val count = outbound.position - start
- limit += count
- batch_length += count
- rc
- }
+ if( total_length > (BUFFER_SIZE<<2) ) {
- def close = {
- flush
- channel.truncate(length.get())
- os.close()
- }
- }
+ // Write the header and flush..
+ os.writeByte(LOG_HEADER_PREFIX)
+ os.writeByte(id)
+ os.writeInt(checksum(data))
+ os.writeInt(data_length)
- case class LogReader(file:File, start:Long) {
+ length.addAndGet(LOG_HEADER_PREFIX)
+ flush
- val is = new RandomAccessFile(file, "r")
+ // Directly write the data to the channel since it's large.
+ val buffer = data.toByteBuffer
+ val pos = length.get()+LOG_HEADER_PREFIX
+ trace("wrote at "+pos+" "+data)
+ channel.write(buffer, pos)
+ if( buffer.hasRemaining ) {
+ throw new IOException("Short write")
+ }
+ length.addAndGet(data_length)
- def read(pos:Long) = this.synchronized {
- is.seek(pos-start)
- val id = is.read()
- if( id == LOG_HEADER_PREFIX(0) ) {
- (id, null, pos+LOG_HEADER_SIZE)
} else {
- val length = is.readInt()
- val data = new Array[Byte](length)
- is.readFully(data)
- (id, data, start+is.getFilePointer)
+ os.writeByte(LOG_HEADER_PREFIX)
+ os.writeByte(id)
+ os.writeInt(checksum(data))
+ os.writeInt(data_length)
+ os.write(data.data, data.offset, data_length)
+ length.addAndGet(total_length)
}
+ rc
+ }
+
+ }
+
+ case class LogReader(file:File, start:Long) extends BaseRetained {
+
+ val fd = open
+
+ def open = new RandomAccessFile(file, "r")
+
+ def channel = fd.getChannel
+
+ override def dispose() {
+ fd.close()
}
def read(pos:Long, length:Int) = this.synchronized {
- is.seek((pos-start)+5)
- val data = new Array[Byte](length)
- is.readFully(data)
- data
+ val offset = (pos-start).toInt
+ if(paranoidChecks) {
+ val record = new Buffer(LOG_HEADER_SIZE+length)
+ if( channel.read(record.toByteBuffer, offset) != record.length ) {
+ val data2 = new Buffer(LOG_HEADER_SIZE+length)
+ channel.read(data2.toByteBuffer, offset)
+ throw new IOException("short record at position: "+pos+" in file:
"+file+", offset: "+offset)
+ }
+
+ val is = new DataByteArrayInputStream(record)
+ val prefix = is.readByte()
+ if( prefix != LOG_HEADER_PREFIX ) {
+ throw new IOException("invalid record at position: "+pos+" in file:
"+file+", offset: "+offset)
+ }
+
+ val id = is.readByte()
+ val expectedChecksum = is.readInt()
+ val expectedLength = is.readInt()
+ val data = is.readBuffer(length)
+
+ // If your reading the whole record we can verify the data checksum
+ if( expectedLength == length ) {
+ if( expectedChecksum != checksum(data) ) {
+ throw new IOException("checksum does not match at position:
"+pos+" in file: "+file+", offset: "+offset)
+ }
+ }
+
+ data
+ } else {
+ val data = new Buffer(length)
+ if( channel.read(data.toByteBuffer, offset+LOG_HEADER_SIZE) !=
data.length ) {
+ throw new IOException("short record at position: "+pos+" in file:
"+file+", offset: "+offset)
+ }
+ data
+ }
}
- def close = this.synchronized {
- is.close()
+ def read(pos:Long) = this.synchronized {
+ val offset = (pos-start).toInt
+ val header = new Buffer(LOG_HEADER_SIZE)
+ channel.read(header.toByteBuffer, offset)
+ val is = header.bigEndianEditor();
+ val prefix = is.readByte()
+ if( prefix != LOG_HEADER_PREFIX ) {
+ // Does not look like a record.
+ throw new IOException("invalid record position")
+ }
+ val id = is.readByte()
+ val expectedChecksum = is.readInt()
+ val length = is.readInt()
+ val data = new Buffer(length)
+
+ if( channel.read(data.toByteBuffer, offset+LOG_HEADER_SIZE) != length ) {
+ throw new IOException("short record")
+ }
+
+ if(paranoidChecks) {
+ if( expectedChecksum != checksum(data) ) {
+ throw new IOException("checksum does not match")
+ }
+ }
+ (id, data, pos+LOG_HEADER_SIZE+length)
}
- def next_position(verify_checksums:Boolean=true):Long = this.synchronized {
- var offset = 0;
- val prefix = new Array[Byte](LOG_HEADER_PREFIX.length)
- var done = false
- while(!done) {
- try {
- is.seek(offset)
- is.readFully(prefix)
- if( !Arrays.equals(prefix, LOG_HEADER_PREFIX) ) {
- throw new IOException("Missing header prefix");
- }
- val expected_checksum = is.readInt();
+ def check(pos:Long):Option[Long] = this.synchronized {
+ var offset = (pos-start).toInt
+ val header = new Buffer(LOG_HEADER_SIZE)
+ channel.read(header.toByteBuffer, offset)
+ val is = header.bigEndianEditor();
+ val prefix = is.readByte()
+ if( prefix != LOG_HEADER_PREFIX ) {
+ return None // Does not look like a record.
+ }
+ val id = is.readByte()
+ val expectedChecksum = is.readInt()
+ val length = is.readInt()
+
+ val chunk = new Buffer(1024*4)
+ val chunkbb = chunk.toByteBuffer
+ offset += LOG_HEADER_SIZE
+
+ // Read the data in in chunks to avoid
+ // OOME if we are checking an invalid record
+ // with a bad record length
+ val checksumer = new CRC32
+ var remaining = length
+ while( remaining > 0 ) {
+ val chunkSize = remaining.min(1024*4);
+ chunkbb.position(0)
+ chunkbb.limit(chunkSize)
+ channel.read(chunkbb, offset)
+ if( chunkbb.hasRemaining ) {
+ return None
+ }
+ checksumer.update(chunk.data, 0, chunkSize)
+ offset += chunkSize
+ remaining -= chunkSize
+ }
- val length = is.readInt();
- if (verify_checksums) {
- val data = new Array[Byte](length)
- is.readFully(data)
-
- val checksum = new CRC32
- checksum.update(data)
- val actual_checksum = (checksum.getValue & 0xFFFFFFFF).toInt
-
- if( expected_checksum != actual_checksum ) {
- throw new IOException("Data checksum missmatch");
- }
- }
- offset += LOG_HEADER_SIZE + length
+ val checksum = ( checksumer.getValue & 0xFFFFFFFF).toInt
+ if( expectedChecksum != checksum ) {
+ return None
+ }
+ return Some(pos+LOG_HEADER_SIZE+length)
+ }
- } catch {
- case e:IOException =>
- done = true
+ def verifyAndGetEndPosition:Long = this.synchronized {
+ var pos = start;
+ val limit = start+channel.size()
+ while(pos < limit) {
+ check(pos) match {
+ case Some(next) => pos = next
+ case None => return pos
}
}
- start + offset
+ pos
}
}
@@ -254,17 +318,17 @@ case class RecordLog(directory: File, lo
def open = {
log_mutex.synchronized {
- log_infos = LevelDBClient.find_sequence_files(directory, log_suffix).map
{ case (position,file) =>
+ log_infos = LevelDBClient.find_sequence_files(directory, logSuffix).map
{ case (position,file) =>
position -> LogInfo(file, position, new AtomicLong(file.length()))
}
- val append_pos = if( log_infos.isEmpty ) {
+ val appendPos = if( log_infos.isEmpty ) {
0L
} else {
val (_, file) = log_infos.last
val r = LogReader(file.file, file.position)
try {
- val rc = r.next_position()
+ val rc = r.verifyAndGetEndPosition
file.length.set(rc - file.position)
if( file.file.length != file.length.get() ) {
// we need to truncate.
@@ -272,23 +336,24 @@ case class RecordLog(directory: File, lo
}
rc
} finally {
- r.close
+ r.release()
}
}
- create_appender(append_pos)
+ create_appender(appendPos)
}
}
+
def close = {
log_mutex.synchronized {
- current_appender.close
+ current_appender.release
}
}
def appender_limit = current_appender.limit
def appender_start = current_appender.start
- def next_log(position:Long) = LevelDBClient.create_sequence_file(directory,
position, log_suffix)
+ def next_log(position:Long) = LevelDBClient.create_sequence_file(directory,
position, logSuffix)
def appender[T](func: (LogAppender)=>T):T= {
try {
@@ -296,8 +361,8 @@ case class RecordLog(directory: File, lo
} finally {
current_appender.flush
log_mutex.synchronized {
- if ( current_appender.length.get >= log_size ) {
- current_appender.close
+ if ( current_appender.length.get >= logSize ) {
+ current_appender.release()
on_log_rotate()
create_appender(current_appender.limit)
}
@@ -307,50 +372,49 @@ case class RecordLog(directory: File, lo
var on_log_rotate: ()=>Unit = ()=>{}
- val next_reader_id = new LongCounter()
- val reader_cache_files = new HashMap[File, HashSet[Long]];
- val reader_cache_readers = new LRUCache[Long, LogReader](100) {
- protected override def onCacheEviction(entry: Entry[Long, LogReader]) = {
- var key = entry.getKey
- var value = entry.getValue
- value.close
-
- val set = reader_cache_files.get(value.file).get
- set.remove(key)
- if( set.isEmpty ) {
- reader_cache_files.remove(value.file)
- }
+ private val reader_cache = new LRUCache[File, LogReader](100) {
+ protected override def onCacheEviction(entry: Entry[File, LogReader]) = {
+ entry.getValue.release()
}
}
def log_info(pos:Long) = log_mutex.synchronized(log_infos.range(0L,
pos+1).lastOption.map(_._2))
private def get_reader[T](pos:Long)(func: (LogReader)=>T) = {
- val info = log_info(pos)
- info.map { info =>
- // Checkout a reader from the cache...
- val (set, reader_id, reader) = reader_cache_files.synchronized {
- var set = reader_cache_files.getOrElseUpdate(info.file, new HashSet);
- if( set.isEmpty ) {
- val reader_id = next_reader_id.getAndIncrement()
- val reader = new LogReader(info.file, info.position)
- set.add(reader_id)
- reader_cache_readers.put(reader_id, reader)
- (set, reader_id, reader)
+
+ val lookup = log_mutex.synchronized {
+ val info = log_info(pos)
+ info.map { info=>
+ if(info.position == current_appender.start) {
+ current_appender.retain()
+ (info, current_appender)
} else {
- val reader_id = set.head
- set.remove(reader_id)
- (set, reader_id, reader_cache_readers.get(reader_id))
+ (info, null)
+ }
+ }
+ }
+
+ lookup.map { case (info, appender) =>
+ val reader = if( appender!=null ) {
+ // read from the current appender.
+ appender
+ } else {
+ // Checkout a reader from the cache...
+ reader_cache.synchronized {
+ var reader = reader_cache.get(info.file)
+ if(reader==null) {
+ reader = LogReader(info.file, info.position)
+ reader_cache.put(info.file, reader)
+ }
+ reader.retain()
+ reader
}
}
try {
func(reader)
} finally {
- // check him back in..
- reader_cache_files.synchronized {
- set.add(reader_id)
- }
+ reader.release
}
}
}