DL-162: Use log segment entry store interface

- Use log segment entry store interface
- Delete the old readahead implementation


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/d871e657
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/d871e657
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/d871e657

Branch: refs/heads/master
Commit: d871e6570fe49cab56fcec37d3a8a6bd91afbe4d
Parents: 47622a6
Author: Sijie Guo <sij...@twitter.com>
Authored: Wed Dec 28 17:09:57 2016 -0800
Committer: Sijie Guo <sij...@twitter.com>
Committed: Thu Dec 29 02:13:02 2016 -0800

----------------------------------------------------------------------
 .../distributedlog/BKAsyncLogReader.java        |   68 +-
 .../distributedlog/BKDistributedLogManager.java |   59 +-
 .../BKDistributedLogNamespace.java              |   10 -
 .../twitter/distributedlog/BKLogHandler.java    |   33 +-
 .../distributedlog/BKLogReadHandler.java        |   15 +-
 .../distributedlog/BKLogWriteHandler.java       |   46 +-
 .../distributedlog/LedgerDescriptor.java        |   67 -
 .../distributedlog/LedgerHandleCache.java       |  463 ------
 .../twitter/distributedlog/ReadAheadCache.java  |  233 ---
 .../com/twitter/distributedlog/ReadUtils.java   |  204 +--
 .../admin/DistributedLogAdmin.java              |   52 +-
 .../impl/logsegment/BKLogSegmentEntryStore.java |  136 +-
 .../BKLogSegmentRandomAccessEntryReader.java    |  119 ++
 .../logsegment/LogSegmentEntryStore.java        |   17 +
 .../LogSegmentRandomAccessEntryReader.java      |   47 +
 .../readahead/ReadAheadPhase.java               |   45 -
 .../readahead/ReadAheadTracker.java             |  104 --
 .../readahead/ReadAheadWorker.java              | 1503 ------------------
 .../stats/ReadAheadExceptionsLogger.java        |   60 -
 .../distributedlog/TestLedgerHandleCache.java   |  180 ---
 .../TestReadAheadEntryReader.java               |    2 +-
 .../twitter/distributedlog/TestReadUtils.java   |   44 +-
 .../twitter/distributedlog/admin/TestDLCK.java  |    9 +-
 .../logsegment/TestBKLogSegmentEntryReader.java |    2 +-
 .../mapreduce/DistributedLogInputFormat.java    |    2 +-
 .../mapreduce/LogSegmentReader.java             |    2 +-
 .../mapreduce/LogSegmentSplit.java              |    4 +-
 27 files changed, 509 insertions(+), 3017 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java
index 18d2e15..b9d0365 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java
@@ -409,43 +409,39 @@ class BKAsyncLogReader implements AsyncLogReader, 
Runnable, AsyncNotification {
         final PendingReadRequest readRequest = new 
PendingReadRequest(numEntries, deadlineTime, deadlineTimeUnit);
 
         if (null == readAheadReader) {
-            try {
-                final ReadAheadEntryReader readAheadEntryReader = 
this.readAheadReader = new ReadAheadEntryReader(
-                        getStreamName(),
-                        getStartDLSN(),
-                        bkDistributedLogManager.getConf(),
-                        readHandler,
-                        bkDistributedLogManager.getReaderEntryStore(),
-                        bkDistributedLogManager.getScheduler(),
-                        Ticker.systemTicker(),
-                        bkDistributedLogManager.alertStatsLogger);
-                readHandler.checkLogStreamExists().addEventListener(new 
FutureEventListener<Void>() {
-                    @Override
-                    public void onSuccess(Void value) {
-                        try {
-                            readHandler.registerListener(readAheadEntryReader);
-                            readHandler.asyncStartFetchLogSegments()
-                                    .map(new 
AbstractFunction1<Versioned<List<LogSegmentMetadata>>, BoxedUnit>() {
-                                        @Override
-                                        public BoxedUnit 
apply(Versioned<List<LogSegmentMetadata>> logSegments) {
-                                            
readAheadEntryReader.addStateChangeNotification(BKAsyncLogReader.this);
-                                            
readAheadEntryReader.start(logSegments.getValue());
-                                            return BoxedUnit.UNIT;
-                                        }
-                                    });
-                        } catch (Exception exc) {
-                            notifyOnError(exc);
-                        }
+            final ReadAheadEntryReader readAheadEntryReader = 
this.readAheadReader = new ReadAheadEntryReader(
+                    getStreamName(),
+                    getStartDLSN(),
+                    bkDistributedLogManager.getConf(),
+                    readHandler,
+                    bkDistributedLogManager.getReaderEntryStore(),
+                    bkDistributedLogManager.getScheduler(),
+                    Ticker.systemTicker(),
+                    bkDistributedLogManager.alertStatsLogger);
+            readHandler.checkLogStreamExists().addEventListener(new 
FutureEventListener<Void>() {
+                @Override
+                public void onSuccess(Void value) {
+                    try {
+                        readHandler.registerListener(readAheadEntryReader);
+                        readHandler.asyncStartFetchLogSegments()
+                                .map(new 
AbstractFunction1<Versioned<List<LogSegmentMetadata>>, BoxedUnit>() {
+                                    @Override
+                                    public BoxedUnit 
apply(Versioned<List<LogSegmentMetadata>> logSegments) {
+                                        
readAheadEntryReader.addStateChangeNotification(BKAsyncLogReader.this);
+                                        
readAheadEntryReader.start(logSegments.getValue());
+                                        return BoxedUnit.UNIT;
+                                    }
+                                });
+                    } catch (Exception exc) {
+                        notifyOnError(exc);
                     }
+                }
 
-                    @Override
-                    public void onFailure(Throwable cause) {
-                        notifyOnError(cause);
-                    }
-                });
-            } catch (IOException ioe) {
-                notifyOnError(ioe);
-            }
+                @Override
+                public void onFailure(Throwable cause) {
+                    notifyOnError(cause);
+                }
+            });
         }
 
         if (checkClosedOrInError("readNext")) {
@@ -598,6 +594,8 @@ class BKAsyncLogReader implements AsyncLogReader, Runnable, 
AsyncNotification {
                 }
                 lastProcessTime.reset().start();
 
+                lastProcessTime.reset().start();
+
                 // If the oldest pending promise is interrupted then we must 
mark
                 // the reader in error and abort all pending reads since we 
dont
                 // know the last consumed read

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
index 219c0cf..d20cc6a 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
@@ -50,7 +50,6 @@ import 
com.twitter.distributedlog.logsegment.LogSegmentMetadataCache;
 import com.twitter.distributedlog.metadata.BKDLConfig;
 import com.twitter.distributedlog.metadata.LogStreamMetadataStore;
 import com.twitter.distributedlog.stats.BroadCastStatsLogger;
-import com.twitter.distributedlog.stats.ReadAheadExceptionsLogger;
 import com.twitter.distributedlog.subscription.SubscriptionStateStore;
 import com.twitter.distributedlog.subscription.SubscriptionsStore;
 import com.twitter.distributedlog.subscription.ZKSubscriptionStateStore;
@@ -72,8 +71,8 @@ import com.twitter.util.Future;
 import com.twitter.util.FuturePool;
 import com.twitter.util.FutureEventListener;
 import com.twitter.util.Promise;
-import org.apache.bookkeeper.stats.AlertStatsLogger;
 import org.apache.bookkeeper.feature.FeatureProvider;
+import org.apache.bookkeeper.stats.AlertStatsLogger;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
@@ -132,7 +131,6 @@ class BKDistributedLogManager extends ZKMetadataAccessor 
implements DistributedL
                 }
             };
 
-
     private final String clientId;
     private final int regionId;
     private final String streamIdentifier;
@@ -160,10 +158,11 @@ class BKDistributedLogManager extends ZKMetadataAccessor 
implements DistributedL
     //       instantiating readers or writers.
     private final BookKeeperClientBuilder writerBKCBuilder;
     private final BookKeeperClient writerBKC;
+    private final LogSegmentEntryStore writerEntryStore;
     private final boolean ownWriterBKC;
     private final BookKeeperClientBuilder readerBKCBuilder;
     private final BookKeeperClient readerBKC;
-    private LogSegmentEntryStore readerEntryStore = null;
+    private final LogSegmentEntryStore readerEntryStore;
     private final boolean ownReaderBKC;
 
     //
@@ -179,7 +178,6 @@ class BKDistributedLogManager extends ZKMetadataAccessor 
implements DistributedL
     private BKLogReadHandler readHandlerForListener = null;
     private FuturePool readerFuturePool = null;
     private final PendingReaders pendingReaders;
-    private final ReadAheadExceptionsLogger readAheadExceptionsLogger;
 
     // Failure Injector
     private final AsyncFailureInjector failureInjector;
@@ -230,7 +228,6 @@ class BKDistributedLogManager extends ZKMetadataAccessor 
implements DistributedL
              null,
              null,
              null,
-             new ReadAheadExceptionsLogger(statsLogger),
              DistributedLogConstants.UNKNOWN_CLIENT_ID,
              DistributedLogConstants.LOCAL_REGION_ID,
              null,
@@ -259,7 +256,6 @@ class BKDistributedLogManager extends ZKMetadataAccessor 
implements DistributedL
      * @param readAheadScheduler readAhead scheduler used by readers
      * @param channelFactory client socket channel factory to build bookkeeper 
clients
      * @param requestTimer request timer to build bookkeeper clients
-     * @param readAheadExceptionsLogger stats logger to record readahead 
exceptions
      * @param clientId client id that used to initiate the locks
      * @param regionId region id that would be encrypted as part of log 
segment metadata
      *                 to indicate which region that the log segment will be 
created
@@ -287,7 +283,6 @@ class BKDistributedLogManager extends ZKMetadataAccessor 
implements DistributedL
                             OrderedScheduler readAheadScheduler,
                             ClientSocketChannelFactory channelFactory,
                             HashedWheelTimer requestTimer,
-                            ReadAheadExceptionsLogger 
readAheadExceptionsLogger,
                             String clientId,
                             Integer regionId,
                             LedgerAllocator ledgerAllocator,
@@ -296,7 +291,6 @@ class BKDistributedLogManager extends ZKMetadataAccessor 
implements DistributedL
                             StatsLogger statsLogger,
                             StatsLogger perLogStatsLogger) throws IOException {
         super(name, conf, uri, writerZKCBuilder, readerZKCBuilder, 
statsLogger);
-        Preconditions.checkNotNull(readAheadExceptionsLogger, "No ReadAhead 
Stats Logger Provided.");
         this.conf = conf;
         this.dynConf = dynConf;
         this.scheduler = scheduler;
@@ -366,6 +360,12 @@ class BKDistributedLogManager extends ZKMetadataAccessor 
implements DistributedL
             this.ownWriterBKC = false;
         }
         this.writerBKC = this.writerBKCBuilder.build();
+        this.writerEntryStore = new BKLogSegmentEntryStore(
+                conf,
+                writerBKC,
+                scheduler,
+                statsLogger,
+                failureInjector);
 
         // create the bkc for readers
         if (null == readerBKCBuilder) {
@@ -395,13 +395,18 @@ class BKDistributedLogManager extends ZKMetadataAccessor 
implements DistributedL
             this.ownReaderBKC = false;
         }
         this.readerBKC = this.readerBKCBuilder.build();
+        this.readerEntryStore = new BKLogSegmentEntryStore(
+                conf,
+                readerBKC,
+                scheduler,
+                statsLogger,
+                failureInjector);
 
         // Feature Provider
         this.featureProvider = featureProvider;
 
         // Stats
         this.alertStatsLogger = new AlertStatsLogger(this.perLogStatsLogger, 
"dl_alert");
-        this.readAheadExceptionsLogger = readAheadExceptionsLogger;
     }
 
     @VisibleForTesting
@@ -431,15 +436,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor 
implements DistributedL
         return this.readerBKC;
     }
 
-    synchronized LogSegmentEntryStore getReaderEntryStore() throws IOException 
{
-        if (null == readerEntryStore) {
-            readerEntryStore = new BKLogSegmentEntryStore(
-                conf,
-                readerBKC.get(),
-                scheduler,
-                statsLogger,
-                failureInjector);
-        }
+    LogSegmentEntryStore getReaderEntryStore() {
         return this.readerEntryStore;
     }
 
@@ -541,9 +538,9 @@ class BKDistributedLogManager extends ZKMetadataAccessor 
implements DistributedL
                 subscriberId,
                 conf,
                 dynConf,
-                readerBKCBuilder,
                 readerMetadataStore,
                 logSegmentMetadataCache,
+                readerEntryStore,
                 scheduler,
                 alertStatsLogger,
                 statsLogger,
@@ -622,9 +619,9 @@ class BKDistributedLogManager extends ZKMetadataAccessor 
implements DistributedL
         final BKLogWriteHandler writeHandler = new BKLogWriteHandler(
                 logMetadata,
                 conf,
-                writerBKCBuilder,
                 writerMetadataStore,
                 logSegmentMetadataCache,
+                writerEntryStore,
                 scheduler,
                 allocator,
                 statsLogger,
@@ -821,33 +818,25 @@ class BKDistributedLogManager extends ZKMetadataAccessor 
implements DistributedL
         if (segmentIdx < 0) {
             return Future.value(new 
DLSN(segments.get(0).getLogSegmentSequenceNumber(), 0L, 0L));
         }
-        final LedgerHandleCache handleCache =
-                
LedgerHandleCache.newBuilder().bkc(readerBKC).conf(conf).build();
         return getDLSNNotLessThanTxIdInSegment(
                 fromTxnId,
                 segmentIdx,
                 segments,
-                handleCache
-        ).ensure(new AbstractFunction0<BoxedUnit>() {
-            @Override
-            public BoxedUnit apply() {
-                handleCache.clear();
-                return BoxedUnit.UNIT;
-            }
-        });
+                readerEntryStore
+        );
     }
 
     private Future<DLSN> getDLSNNotLessThanTxIdInSegment(final long fromTxnId,
                                                          final int segmentIdx,
                                                          final 
List<LogSegmentMetadata> segments,
-                                                         final 
LedgerHandleCache handleCache) {
+                                                         final 
LogSegmentEntryStore entryStore) {
         final LogSegmentMetadata segment = segments.get(segmentIdx);
         return ReadUtils.getLogRecordNotLessThanTxId(
                 name,
                 segment,
                 fromTxnId,
                 scheduler,
-                handleCache,
+                entryStore,
                 Math.max(2, dynConf.getReadAheadBatchSize())
         ).flatMap(new AbstractFunction1<Optional<LogRecordWithDLSN>, 
Future<DLSN>>() {
             @Override
@@ -870,7 +859,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor 
implements DistributedL
                             fromTxnId,
                             segmentIdx + 1,
                             segments,
-                            handleCache);
+                            entryStore);
                 }
             }
         });
@@ -915,7 +904,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor 
implements DistributedL
      * </p>
      *
      * @see DLUtils#findLogSegmentNotLessThanTxnId(List, long)
-     * @see ReadUtils#getLogRecordNotLessThanTxId(String, LogSegmentMetadata, 
long, ExecutorService, LedgerHandleCache, int)
+     * @see ReadUtils#getLogRecordNotLessThanTxId(String, LogSegmentMetadata, 
long, ExecutorService, LogSegmentEntryStore, int)
      * @param fromTxnId
      *          transaction id to start reading from
      * @return future representing the open result.

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
index e7f29cc..1a23228 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
@@ -43,7 +43,6 @@ import com.twitter.distributedlog.metadata.BKDLConfig;
 import com.twitter.distributedlog.metadata.LogMetadataStore;
 import com.twitter.distributedlog.metadata.LogStreamMetadataStore;
 import com.twitter.distributedlog.namespace.DistributedLogNamespace;
-import com.twitter.distributedlog.stats.ReadAheadExceptionsLogger;
 import com.twitter.distributedlog.util.ConfUtils;
 import com.twitter.distributedlog.util.DLUtils;
 import com.twitter.distributedlog.util.FutureUtils;
@@ -113,10 +112,6 @@ import static com.twitter.distributedlog.impl.BKDLUtils.*;
  * See {@link PermitLimiter}.
  * </ul>
  *
- * <h4>ReadAhead Exceptions</h4>
- * Stats about exceptions that encountered in ReadAhead are exposed under 
<code>`scope`/exceptions</code>.
- * See {@link ReadAheadExceptionsLogger}.
- *
  * <h4>DistributedLogManager</h4>
  *
  * All the core stats about reader and writer are exposed under current scope 
via {@link BKDistributedLogManager}.
@@ -305,7 +300,6 @@ public class BKDistributedLogNamespace implements 
DistributedLogNamespace {
     // Stats Loggers
     private final StatsLogger statsLogger;
     private final StatsLogger perLogStatsLogger;
-    private final ReadAheadExceptionsLogger readAheadExceptionsLogger;
 
     protected AtomicBoolean closed = new AtomicBoolean(false);
 
@@ -436,9 +430,6 @@ public class BKDistributedLogNamespace implements 
DistributedLogNamespace {
             allocator = null;
         }
 
-        // Stats Loggers
-        this.readAheadExceptionsLogger = new 
ReadAheadExceptionsLogger(statsLogger);
-
         // log metadata store
         if (bkdlConfig.isFederatedNamespace() || 
conf.isFederatedNamespaceEnabled()) {
             this.metadataStore = new FederatedZKLogMetadataStore(conf, 
namespace, sharedReaderZKCForDL, scheduler);
@@ -895,7 +886,6 @@ public class BKDistributedLogNamespace implements 
DistributedLogNamespace {
                 readAheadExecutor,                  /* Read Aheader Executor */
                 channelFactory,                     /* Netty Channel Factory */
                 requestTimer,                       /* Request Timer */
-                readAheadExceptionsLogger,          /* ReadAhead Exceptions 
Logger */
                 clientId,                           /* Client Id */
                 regionId,                           /* Region Id */
                 dlmLedgerAlloctor,                  /* Ledger Allocator */

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
index caee864..0cf8ed5 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
@@ -17,12 +17,12 @@
  */
 package com.twitter.distributedlog;
 
-import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
 import com.twitter.distributedlog.callback.LogSegmentNamesListener;
 import com.twitter.distributedlog.exceptions.LogEmptyException;
 import com.twitter.distributedlog.exceptions.LogSegmentNotFoundException;
 import com.twitter.distributedlog.exceptions.UnexpectedException;
+import com.twitter.distributedlog.logsegment.LogSegmentEntryStore;
 import com.twitter.distributedlog.metadata.LogMetadata;
 import com.twitter.distributedlog.io.AsyncAbortable;
 import com.twitter.distributedlog.io.AsyncCloseable;
@@ -45,8 +45,6 @@ import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction0;
-import scala.runtime.BoxedUnit;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -89,10 +87,10 @@ public abstract class BKLogHandler implements 
AsyncCloseable, AsyncAbortable {
 
     protected final LogMetadata logMetadata;
     protected final DistributedLogConfiguration conf;
-    protected final BookKeeperClient bookKeeperClient;
     protected final LogStreamMetadataStore streamMetadataStore;
     protected final LogSegmentMetadataStore metadataStore;
     protected final LogSegmentMetadataCache metadataCache;
+    protected final LogSegmentEntryStore entryStore;
     protected final int firstNumEntriesPerReadLastRecordScan;
     protected final int maxNumEntriesPerReadLastRecordScan;
     protected volatile long lastLedgerRollingTimeMillis = -1;
@@ -122,14 +120,13 @@ public abstract class BKLogHandler implements 
AsyncCloseable, AsyncAbortable {
      */
     BKLogHandler(LogMetadata metadata,
                  DistributedLogConfiguration conf,
-                 BookKeeperClientBuilder bkcBuilder,
                  LogStreamMetadataStore streamMetadataStore,
                  LogSegmentMetadataCache metadataCache,
+                 LogSegmentEntryStore entryStore,
                  OrderedScheduler scheduler,
                  StatsLogger statsLogger,
                  AlertStatsLogger alertStatsLogger,
                  String lockClientId) {
-        Preconditions.checkNotNull(bkcBuilder);
         this.logMetadata = metadata;
         this.conf = conf;
         this.scheduler = scheduler;
@@ -140,10 +137,10 @@ public abstract class BKLogHandler implements 
AsyncCloseable, AsyncAbortable {
                 conf.isLogSegmentSequenceNumberValidationEnabled());
         firstNumEntriesPerReadLastRecordScan = 
conf.getFirstNumEntriesPerReadLastRecordScan();
         maxNumEntriesPerReadLastRecordScan = 
conf.getMaxNumEntriesPerReadLastRecordScan();
-        this.bookKeeperClient = bkcBuilder.build();
         this.streamMetadataStore = streamMetadataStore;
         this.metadataStore = streamMetadataStore.getLogSegmentMetadataStore();
         this.metadataCache = metadataCache;
+        this.entryStore = entryStore;
         this.lockClientId = lockClientId;
 
         // Traces
@@ -293,8 +290,6 @@ public abstract class BKLogHandler implements 
AsyncCloseable, AsyncAbortable {
     }
 
     private Future<LogRecordWithDLSN> 
asyncReadFirstUserRecord(LogSegmentMetadata ledger, DLSN beginDLSN) {
-        final LedgerHandleCache handleCache =
-                
LedgerHandleCache.newBuilder().bkc(bookKeeperClient).conf(conf).build();
         return ReadUtils.asyncReadFirstUserRecord(
                 getFullyQualifiedName(),
                 ledger,
@@ -302,15 +297,9 @@ public abstract class BKLogHandler implements 
AsyncCloseable, AsyncAbortable {
                 maxNumEntriesPerReadLastRecordScan,
                 new AtomicInteger(0),
                 scheduler,
-                handleCache,
+                entryStore,
                 beginDLSN
-        ).ensure(new AbstractFunction0<BoxedUnit>() {
-            @Override
-            public BoxedUnit apply() {
-                handleCache.clear();
-                return BoxedUnit.UNIT;
-            }
-        });
+        );
     }
 
     /**
@@ -422,8 +411,6 @@ public abstract class BKLogHandler implements 
AsyncCloseable, AsyncAbortable {
                                                          final boolean 
includeEndOfStream) {
         final AtomicInteger numRecordsScanned = new AtomicInteger(0);
         final Stopwatch stopwatch = Stopwatch.createStarted();
-        final LedgerHandleCache handleCache =
-                
LedgerHandleCache.newBuilder().bkc(bookKeeperClient).conf(conf).build();
         return ReadUtils.asyncReadLastRecord(
                 getFullyQualifiedName(),
                 l,
@@ -434,7 +421,7 @@ public abstract class BKLogHandler implements 
AsyncCloseable, AsyncAbortable {
                 maxNumEntriesPerReadLastRecordScan,
                 numRecordsScanned,
                 scheduler,
-                handleCache
+                entryStore
         ).addEventListener(new FutureEventListener<LogRecordWithDLSN>() {
             @Override
             public void onSuccess(LogRecordWithDLSN value) {
@@ -446,12 +433,6 @@ public abstract class BKLogHandler implements 
AsyncCloseable, AsyncAbortable {
             public void onFailure(Throwable cause) {
                 
recoverLastEntryStats.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
             }
-        }).ensure(new AbstractFunction0<BoxedUnit>() {
-            @Override
-            public BoxedUnit apply() {
-                handleCache.clear();
-                return BoxedUnit.UNIT;
-            }
         });
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
index 67c584c..8aa00e7 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
@@ -34,6 +34,7 @@ import com.twitter.distributedlog.exceptions.LockingException;
 import com.twitter.distributedlog.exceptions.LogNotFoundException;
 import com.twitter.distributedlog.exceptions.LogSegmentNotFoundException;
 import com.twitter.distributedlog.exceptions.UnexpectedException;
+import com.twitter.distributedlog.logsegment.LogSegmentEntryStore;
 import com.twitter.distributedlog.metadata.LogMetadataForReader;
 import com.twitter.distributedlog.lock.DistributedLock;
 import com.twitter.distributedlog.logsegment.LogSegmentFilter;
@@ -106,7 +107,6 @@ class BKLogReadHandler extends BKLogHandler implements 
LogSegmentNamesListener {
     static final Logger LOG = LoggerFactory.getLogger(BKLogReadHandler.class);
 
     protected final LogMetadataForReader logMetadataForReader;
-    protected final LedgerHandleCache handleCache;
 
     protected final DynamicDistributedLogConfiguration dynConf;
 
@@ -134,9 +134,9 @@ class BKLogReadHandler extends BKLogHandler implements 
LogSegmentNamesListener {
                      Optional<String> subscriberId,
                      DistributedLogConfiguration conf,
                      DynamicDistributedLogConfiguration dynConf,
-                     BookKeeperClientBuilder bkcBuilder,
                      LogStreamMetadataStore streamMetadataStore,
                      LogSegmentMetadataCache metadataCache,
+                     LogSegmentEntryStore entryStore,
                      OrderedScheduler scheduler,
                      AlertStatsLogger alertStatsLogger,
                      StatsLogger statsLogger,
@@ -146,9 +146,9 @@ class BKLogReadHandler extends BKLogHandler implements 
LogSegmentNamesListener {
                      boolean isHandleForReading) {
         super(logMetadata,
                 conf,
-                bkcBuilder,
                 streamMetadataStore,
                 metadataCache,
+                entryStore,
                 scheduler,
                 statsLogger,
                 alertStatsLogger,
@@ -158,12 +158,6 @@ class BKLogReadHandler extends BKLogHandler implements 
LogSegmentNamesListener {
         this.perLogStatsLogger =
                 isHandleForReading ? perLogStatsLogger : 
NullStatsLogger.INSTANCE;
         this.readerStateNotification = readerStateNotification;
-
-        handleCache = LedgerHandleCache.newBuilder()
-                .bkc(this.bookKeeperClient)
-                .conf(conf)
-                .statsLogger(statsLogger)
-                .build();
         this.subscriberId = subscriberId;
     }
 
@@ -265,9 +259,6 @@ class BKLogReadHandler extends BKLogHandler implements 
LogSegmentNamesListener {
                 .flatMap(new AbstractFunction1<Void, Future<Void>>() {
             @Override
             public Future<Void> apply(Void result) {
-                if (null != handleCache) {
-                    handleCache.clear();
-                }
                 // unregister the log segment listener
                 
metadataStore.unregisterLogSegmentListener(logMetadata.getLogSegmentsPath(), 
BKLogReadHandler.this);
                 return Future.Void();

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
index 3f06700..25b25e2 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
@@ -30,6 +30,7 @@ import 
com.twitter.distributedlog.exceptions.TransactionIdOutOfOrderException;
 import com.twitter.distributedlog.exceptions.UnexpectedException;
 import com.twitter.distributedlog.function.GetLastTxIdFunction;
 import com.twitter.distributedlog.impl.logsegment.BKLogSegmentEntryWriter;
+import com.twitter.distributedlog.logsegment.LogSegmentEntryStore;
 import com.twitter.distributedlog.metadata.LogMetadataForWriter;
 import com.twitter.distributedlog.lock.DistributedLock;
 import com.twitter.distributedlog.logsegment.LogSegmentFilter;
@@ -52,8 +53,6 @@ import com.twitter.util.Function;
 import com.twitter.util.Future;
 import com.twitter.util.FutureEventListener;
 import com.twitter.util.Promise;
-import org.apache.bookkeeper.client.AsyncCallback;
-import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.feature.FeatureProvider;
 import org.apache.bookkeeper.stats.AlertStatsLogger;
@@ -151,9 +150,9 @@ class BKLogWriteHandler extends BKLogHandler {
      */
     BKLogWriteHandler(LogMetadataForWriter logMetadata,
                       DistributedLogConfiguration conf,
-                      BookKeeperClientBuilder bkcBuilder,
                       LogStreamMetadataStore streamMetadataStore,
                       LogSegmentMetadataCache metadataCache,
+                      LogSegmentEntryStore entryStore,
                       OrderedScheduler scheduler,
                       LedgerAllocator allocator,
                       StatsLogger statsLogger,
@@ -167,9 +166,9 @@ class BKLogWriteHandler extends BKLogHandler {
                       DistributedLock lock /** owned by handler **/) {
         super(logMetadata,
                 conf,
-                bkcBuilder,
                 streamMetadataStore,
                 metadataCache,
+                entryStore,
                 scheduler,
                 statsLogger,
                 alertStatsLogger,
@@ -1222,33 +1221,18 @@ class BKLogWriteHandler extends BKLogHandler {
                 
deleteOpStats.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
             }
         });
-        try {
-            
bookKeeperClient.get().asyncDeleteLedger(ledgerMetadata.getLogSegmentId(), new 
AsyncCallback.DeleteCallback() {
-                @Override
-                public void deleteComplete(int rc, Object ctx) {
-                    if (BKException.Code.NoSuchLedgerExistsException == rc) {
-                        LOG.warn("No ledger {} found to delete for {} : {}.",
-                                new Object[]{ledgerMetadata.getLogSegmentId(), 
getFullyQualifiedName(),
-                                        ledgerMetadata});
-                    } else if (BKException.Code.OK != rc) {
-                        BKException bke = BKException.create(rc);
-                        LOG.error("Couldn't delete ledger {} from bookkeeper 
for {} : ",
-                                new Object[]{ledgerMetadata.getLogSegmentId(), 
getFullyQualifiedName(), bke});
-                        promise.setException(bke);
-                        return;
-                    }
-                    // after the ledger is deleted, we delete the metadata 
znode
-                    scheduler.submit(new Runnable() {
-                        @Override
-                        public void run() {
-                            deleteLogSegmentMetadata(ledgerMetadata, promise);
-                        }
-                    });
-                }
-            }, null);
-        } catch (IOException e) {
-            
promise.setException(BKException.create(BKException.Code.BookieHandleNotAvailableException));
-        }
+        entryStore.deleteLogSegment(ledgerMetadata)
+                .addEventListener(new 
FutureEventListener<LogSegmentMetadata>() {
+            @Override
+            public void onFailure(Throwable cause) {
+                FutureUtils.setException(promise, cause);
+            }
+
+            @Override
+            public void onSuccess(LogSegmentMetadata segment) {
+                deleteLogSegmentMetadata(segment, promise);
+            }
+        });
         return promise;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/main/java/com/twitter/distributedlog/LedgerDescriptor.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/LedgerDescriptor.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/LedgerDescriptor.java
deleted file mode 100644
index 5a95e46..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/LedgerDescriptor.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-public class LedgerDescriptor {
-    private final long ledgerId;
-    private final long logSegmentSequenceNo;
-    private final boolean fenced;
-
-    public LedgerDescriptor(long ledgerId, long logSegmentSequenceNo, boolean 
fenced) {
-        this.ledgerId = ledgerId;
-        this.logSegmentSequenceNo = logSegmentSequenceNo;
-        this.fenced = fenced;
-    }
-
-    public long getLedgerId() {
-        return ledgerId;
-    }
-
-    public long getLogSegmentSequenceNo() {
-        return logSegmentSequenceNo;
-    }
-
-    public boolean isFenced() {
-        return fenced;
-    }
-
-    // Only compares the key portion
-    @Override
-    public boolean equals(Object other) {
-        if (!(other instanceof LedgerDescriptor)) {
-            return false;
-        }
-        LedgerDescriptor key = (LedgerDescriptor) other;
-        return ledgerId == key.ledgerId &&
-            fenced == key.fenced;
-    }
-
-    @Override
-    public int hashCode() {
-        return (int) (ledgerId * 13 ^ (fenced ? 0xFFFF : 0xF0F0) * 17);
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append("(lid=").append(ledgerId).append(", 
lseqno=").append(logSegmentSequenceNo)
-                .append(", fenced=").append(fenced).append(")");
-        return sb.toString();
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/main/java/com/twitter/distributedlog/LedgerHandleCache.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/LedgerHandleCache.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/LedgerHandleCache.java
deleted file mode 100644
index 49896fd..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/LedgerHandleCache.java
+++ /dev/null
@@ -1,463 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Stopwatch;
-import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import org.apache.bookkeeper.client.AsyncCallback;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.LedgerEntry;
-import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.commons.lang3.tuple.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Enumeration;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static com.google.common.base.Charsets.UTF_8;
-
-/**
- * A central place on managing open ledgers.
- */
-public class LedgerHandleCache {
-    static final Logger LOG = LoggerFactory.getLogger(LedgerHandleCache.class);
-
-    public static Builder newBuilder() {
-        return new Builder();
-    }
-
-    public static class Builder {
-
-        private BookKeeperClient bkc;
-        private String digestpw;
-        private StatsLogger statsLogger = NullStatsLogger.INSTANCE;
-
-        private Builder() {}
-
-        public Builder bkc(BookKeeperClient bkc) {
-            this.bkc = bkc;
-            return this;
-        }
-
-        public Builder conf(DistributedLogConfiguration conf) {
-            this.digestpw = conf.getBKDigestPW();
-            return this;
-        }
-
-        public Builder statsLogger(StatsLogger statsLogger) {
-            this.statsLogger = statsLogger;
-            return this;
-        }
-
-        public LedgerHandleCache build() {
-            Preconditions.checkNotNull(bkc, "No bookkeeper client is 
provided");
-            Preconditions.checkNotNull(digestpw, "No bookkeeper digest 
password is provided");
-            Preconditions.checkNotNull(statsLogger, "No stats logger is 
provided");
-            return new LedgerHandleCache(bkc, digestpw, statsLogger);
-        }
-    }
-
-    final ConcurrentHashMap<LedgerDescriptor, RefCountedLedgerHandle> 
handlesMap =
-        new ConcurrentHashMap<LedgerDescriptor, RefCountedLedgerHandle>();
-
-    private final BookKeeperClient bkc;
-    private final String digestpw;
-
-    private final OpStatsLogger openStats;
-    private final OpStatsLogger openNoRecoveryStats;
-
-    private LedgerHandleCache(BookKeeperClient bkc, String digestpw, 
StatsLogger statsLogger) {
-        this.bkc = bkc;
-        this.digestpw = digestpw;
-        // Stats
-        openStats = statsLogger.getOpStatsLogger("open_ledger");
-        openNoRecoveryStats = 
statsLogger.getOpStatsLogger("open_ledger_no_recovery");
-    }
-
-    /**
-     * Open the given ledger <i>ledgerDesc</i>.
-     *
-     * @param ledgerDesc
-     *          ledger description
-     * @param callback
-     *          open callback.
-     * @param ctx
-     *          callback context
-     */
-    private void asyncOpenLedger(LedgerDescriptor ledgerDesc, 
AsyncCallback.OpenCallback callback, Object ctx) {
-        try {
-            if (!ledgerDesc.isFenced()) {
-                bkc.get().asyncOpenLedgerNoRecovery(ledgerDesc.getLedgerId(),
-                        BookKeeper.DigestType.CRC32, digestpw.getBytes(UTF_8), 
callback, ctx);
-            } else {
-                bkc.get().asyncOpenLedger(ledgerDesc.getLedgerId(),
-                        BookKeeper.DigestType.CRC32, digestpw.getBytes(UTF_8), 
callback, ctx);
-            }
-        } catch (IOException ace) {
-            // :) when we can't get bkc, it means bookie handle not available
-            
callback.openComplete(BKException.Code.BookieHandleNotAvailableException, null, 
ctx);
-        }
-    }
-
-    /**
-     * Open the log segment.
-     *
-     * @param metadata
-     *          the log segment metadata
-     * @param fence
-     *          whether to fence the log segment during open
-     * @return a future presenting the open result.
-     */
-    public Future<LedgerDescriptor> asyncOpenLedger(LogSegmentMetadata 
metadata, boolean fence) {
-        final Stopwatch stopwatch = Stopwatch.createStarted();
-        final OpStatsLogger openStatsLogger = fence ? openStats : 
openNoRecoveryStats;
-        final Promise<LedgerDescriptor> promise = new 
Promise<LedgerDescriptor>();
-        final LedgerDescriptor ledgerDesc = new 
LedgerDescriptor(metadata.getLogSegmentId(), 
metadata.getLogSegmentSequenceNumber(), fence);
-        RefCountedLedgerHandle refhandle = handlesMap.get(ledgerDesc);
-        if (null == refhandle) {
-            asyncOpenLedger(ledgerDesc, new AsyncCallback.OpenCallback() {
-                @Override
-                public void openComplete(int rc, LedgerHandle lh, Object ctx) {
-                    if (BKException.Code.OK != rc) {
-                        promise.setException(BKException.create(rc));
-                        return;
-                    }
-                    RefCountedLedgerHandle newRefHandle = new 
RefCountedLedgerHandle(lh);
-                    RefCountedLedgerHandle oldRefHandle = 
handlesMap.putIfAbsent(ledgerDesc, newRefHandle);
-                    if (null != oldRefHandle) {
-                        oldRefHandle.addRef();
-                        if (newRefHandle.removeRef()) {
-                            newRefHandle.handle.asyncClose(new 
AsyncCallback.CloseCallback() {
-                                @Override
-                                public void closeComplete(int i, LedgerHandle 
ledgerHandle, Object o) {
-                                    // No action necessary
-                                }
-                            }, null);
-                        }
-                    }
-                    promise.setValue(ledgerDesc);
-                }
-            }, null);
-        } else {
-            refhandle.addRef();
-            promise.setValue(ledgerDesc);
-        }
-        return promise.addEventListener(new 
FutureEventListener<LedgerDescriptor>() {
-            @Override
-            public void onSuccess(LedgerDescriptor value) {
-                
openStatsLogger.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
-            }
-
-            @Override
-            public void onFailure(Throwable cause) {
-                
openStatsLogger.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
-            }
-        });
-    }
-
-    /**
-     * Open a ledger synchronously.
-     *
-     * @param metadata
-     *          log segment metadata
-     * @param fence
-     *          whether to fence the log segment during open
-     * @return ledger descriptor
-     * @throws BKException
-     */
-    public LedgerDescriptor openLedger(LogSegmentMetadata metadata, boolean 
fence) throws BKException {
-        return FutureUtils.bkResult(asyncOpenLedger(metadata, fence));
-    }
-
-    private RefCountedLedgerHandle getLedgerHandle(LedgerDescriptor 
ledgerDescriptor) {
-        return null == ledgerDescriptor ? null : 
handlesMap.get(ledgerDescriptor);
-    }
-
-    /**
-     * Close the ledger asynchronously.
-     *
-     * @param ledgerDesc
-     *          ledger descriptor.
-     * @return future presenting the closing result.
-     */
-    public Future<Void> asyncCloseLedger(LedgerDescriptor ledgerDesc) {
-        final Promise<Void> promise = new Promise<Void>();
-
-        RefCountedLedgerHandle refhandle = getLedgerHandle(ledgerDesc);
-        if ((null != refhandle) && (refhandle.removeRef())) {
-            refhandle = handlesMap.remove(ledgerDesc);
-            if (refhandle.getRefCount() > 0) {
-                // In the rare race condition that a ref count was added 
immediately
-                // after the close de-refed it and the remove was called
-
-                // Try to put the handle back in the map
-                handlesMap.putIfAbsent(ledgerDesc, refhandle);
-
-                // ReadOnlyLedgerHandles don't have much overhead, so lets 
just leave
-                // the handle open even if it had already been replaced
-                promise.setValue(null);
-            } else {
-                refhandle.handle.asyncClose(new AsyncCallback.CloseCallback() {
-                    @Override
-                    public void closeComplete(int rc, LedgerHandle 
ledgerHandle, Object ctx) {
-                        if (BKException.Code.OK == rc) {
-                            promise.setValue(null);
-                        } else {
-                            promise.setException(BKException.create(rc));
-                        }
-                    }
-                }, null);
-            }
-        } else {
-            promise.setValue(null);
-        }
-        return promise;
-    }
-
-    /**
-     * Close the ledger synchronously.
-     *
-     * @param ledgerDesc
-     *          ledger descriptor.
-     * @throws BKException
-     */
-    public void closeLedger(LedgerDescriptor ledgerDesc) throws BKException {
-        FutureUtils.bkResult(asyncCloseLedger(ledgerDesc));
-    }
-
-    /**
-     * Get the last add confirmed of <code>ledgerDesc</code>.
-     *
-     * @param ledgerDesc
-     *          ledger descriptor.
-     * @return last add confirmed of <code>ledgerDesc</code>
-     * @throws BKException
-     */
-    public long getLastAddConfirmed(LedgerDescriptor ledgerDesc) throws 
BKException {
-        RefCountedLedgerHandle refhandle = getLedgerHandle(ledgerDesc);
-
-        if (null == refhandle) {
-            LOG.error("Accessing ledger {} without opening.", ledgerDesc);
-            throw 
BKException.create(BKException.Code.UnexpectedConditionException);
-        }
-
-        return refhandle.handle.getLastAddConfirmed();
-    }
-
-    /**
-     * Whether a ledger is closed or not.
-     *
-     * @param ledgerDesc
-     *          ledger descriptor.
-     * @return true if a ledger is closed, otherwise false.
-     * @throws BKException
-     */
-    public boolean isLedgerHandleClosed(LedgerDescriptor ledgerDesc) throws 
BKException {
-        RefCountedLedgerHandle refhandle = getLedgerHandle(ledgerDesc);
-
-        if (null == refhandle) {
-            LOG.error("Accessing ledger {} without opening.", ledgerDesc);
-            throw 
BKException.create(BKException.Code.UnexpectedConditionException);
-        }
-
-        return refhandle.handle.isClosed();
-    }
-
-    /**
-     * Async try read last confirmed.
-     *
-     * @param ledgerDesc
-     *          ledger descriptor
-     * @return future presenting read last confirmed result.
-     */
-    public Future<Long> asyncTryReadLastConfirmed(LedgerDescriptor ledgerDesc) 
{
-        RefCountedLedgerHandle refHandle = handlesMap.get(ledgerDesc);
-        if (null == refHandle) {
-            LOG.error("Accessing ledger {} without opening.", ledgerDesc);
-            return 
Future.exception(BKException.create(BKException.Code.UnexpectedConditionException));
-        }
-        final Promise<Long> promise = new Promise<Long>();
-        refHandle.handle.asyncTryReadLastConfirmed(new 
AsyncCallback.ReadLastConfirmedCallback() {
-            @Override
-            public void readLastConfirmedComplete(int rc, long 
lastAddConfirmed, Object ctx) {
-                if (BKException.Code.OK == rc) {
-                    promise.setValue(lastAddConfirmed);
-                } else {
-                    promise.setException(BKException.create(rc));
-                }
-            }
-        }, null);
-        return promise;
-    }
-
-    /**
-     * Try read last confirmed.
-     *
-     * @param ledgerDesc
-     *          ledger descriptor
-     * @return last confirmed
-     * @throws BKException
-     */
-    public long tryReadLastConfirmed(LedgerDescriptor ledgerDesc) throws 
BKException {
-        return FutureUtils.bkResult(asyncTryReadLastConfirmed(ledgerDesc));
-    }
-
-    /**
-     * Async read last confirmed and entry
-     *
-     * @param ledgerDesc
-     *          ledger descriptor
-     * @param entryId
-     *          entry id to read
-     * @param timeOutInMillis
-     *          time out if no newer entry available
-     * @param parallel
-     *          whether to read from replicas in parallel
-     */
-    public Future<Pair<Long, LedgerEntry>> asyncReadLastConfirmedAndEntry(
-            LedgerDescriptor ledgerDesc,
-            long entryId,
-            long timeOutInMillis,
-            boolean parallel) {
-        RefCountedLedgerHandle refHandle = handlesMap.get(ledgerDesc);
-        if (null == refHandle) {
-            LOG.error("Accessing ledger {} without opening.", ledgerDesc);
-            return 
Future.exception(BKException.create(BKException.Code.UnexpectedConditionException));
-        }
-        final Promise<Pair<Long, LedgerEntry>> promise = new 
Promise<Pair<Long, LedgerEntry>>();
-        refHandle.handle.asyncReadLastConfirmedAndEntry(entryId, 
timeOutInMillis, parallel,
-                new AsyncCallback.ReadLastConfirmedAndEntryCallback() {
-                    @Override
-                    public void readLastConfirmedAndEntryComplete(int rc, long 
lac, LedgerEntry ledgerEntry, Object ctx) {
-                        if (BKException.Code.OK == rc) {
-                            promise.setValue(Pair.of(lac, ledgerEntry));
-                        } else {
-                            promise.setException(BKException.create(rc));
-                        }
-                    }
-                }, null);
-        return promise;
-    }
-
-    /**
-     * Async Read Entries
-     *
-     * @param ledgerDesc
-     *          ledger descriptor
-     * @param first
-     *          first entry
-     * @param last
-     *          second entry
-     */
-    public Future<Enumeration<LedgerEntry>> asyncReadEntries(
-            LedgerDescriptor ledgerDesc, long first, long last) {
-        RefCountedLedgerHandle refHandle = handlesMap.get(ledgerDesc);
-        if (null == refHandle) {
-            LOG.error("Accessing ledger {} without opening.", ledgerDesc);
-            return 
Future.exception(BKException.create(BKException.Code.UnexpectedConditionException));
-        }
-        final Promise<Enumeration<LedgerEntry>> promise = new 
Promise<Enumeration<LedgerEntry>>();
-        refHandle.handle.asyncReadEntries(first, last, new 
AsyncCallback.ReadCallback() {
-            @Override
-            public void readComplete(int rc, LedgerHandle lh, 
Enumeration<LedgerEntry> entries, Object ctx) {
-                if (BKException.Code.OK == rc) {
-                    promise.setValue(entries);
-                } else {
-                    promise.setException(BKException.create(rc));
-                }
-            }
-        }, null);
-        return promise;
-    }
-
-    public Enumeration<LedgerEntry> readEntries(LedgerDescriptor ledgerDesc, 
long first, long last)
-            throws BKException {
-        return FutureUtils.bkResult(asyncReadEntries(ledgerDesc, first, last));
-    }
-
-    public long getLength(LedgerDescriptor ledgerDesc) throws BKException {
-        RefCountedLedgerHandle refhandle = getLedgerHandle(ledgerDesc);
-
-        if (null == refhandle) {
-            LOG.error("Accessing ledger {} without opening.", ledgerDesc);
-            throw 
BKException.create(BKException.Code.UnexpectedConditionException);
-        }
-
-        return refhandle.handle.getLength();
-    }
-
-    public void clear() {
-        if (null != handlesMap) {
-            Iterator<Map.Entry<LedgerDescriptor, RefCountedLedgerHandle>> 
handlesMapIter = handlesMap.entrySet().iterator();
-            while (handlesMapIter.hasNext()) {
-                Map.Entry<LedgerDescriptor, RefCountedLedgerHandle> entry = 
handlesMapIter.next();
-                // Make it inaccessible through the map
-                handlesMapIter.remove();
-                // now close the ledger
-                entry.getValue().forceClose();
-            }
-        }
-    }
-
-    static class RefCountedLedgerHandle {
-        public final LedgerHandle handle;
-        final AtomicLong refcount = new AtomicLong(0);
-
-        RefCountedLedgerHandle(LedgerHandle lh) {
-            this.handle = lh;
-            addRef();
-        }
-
-        long getRefCount() {
-            return refcount.get();
-        }
-
-        public void addRef() {
-            refcount.incrementAndGet();
-        }
-
-        public boolean removeRef() {
-            return (refcount.decrementAndGet() == 0);
-        }
-
-        public void forceClose() {
-            try {
-                handle.close();
-            } catch (BKException.BKLedgerClosedException exc) {
-                // Ignore
-            } catch (Exception exc) {
-                LOG.warn("Exception while closing ledger {}", handle, exc);
-            }
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadCache.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadCache.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadCache.java
deleted file mode 100644
index 284b327..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadCache.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import java.io.IOException;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import com.google.common.base.Stopwatch;
-import com.google.common.base.Ticker;
-import com.twitter.distributedlog.callback.ReadAheadCallback;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
-import com.twitter.distributedlog.exceptions.InvalidEnvelopedEntryException;
-import com.twitter.distributedlog.exceptions.LogReadException;
-import org.apache.bookkeeper.client.LedgerEntry;
-import org.apache.bookkeeper.stats.AlertStatsLogger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ReadAheadCache {
-    static final Logger LOG = LoggerFactory.getLogger(ReadAheadCache.class);
-
-    private final String streamName;
-    private final LinkedBlockingQueue<Entry.Reader> readAheadEntries;
-    private final int maxCachedEntries;
-    private final AtomicReference<IOException> lastException = new 
AtomicReference<IOException>();
-    private final boolean deserializeRecordSet;
-    // callbacks
-    private final AsyncNotification notification;
-    private ReadAheadCallback readAheadCallback = null;
-
-    // variables for idle reader detection
-    private final Stopwatch lastEntryProcessTime;
-
-    private final AlertStatsLogger alertStatsLogger;
-
-    public ReadAheadCache(String streamName,
-                          AlertStatsLogger alertStatsLogger,
-                          AsyncNotification notification,
-                          int maxCachedRecords,
-                          boolean deserializeRecordSet,
-                          Ticker ticker) {
-        this.streamName = streamName;
-        this.maxCachedEntries = maxCachedRecords;
-        this.notification = notification;
-        this.deserializeRecordSet = deserializeRecordSet;
-
-        // create the readahead queue
-        readAheadEntries = new LinkedBlockingQueue<Entry.Reader>();
-
-        // start the idle reader detection
-        lastEntryProcessTime = Stopwatch.createStarted(ticker);
-
-        // Stats
-        this.alertStatsLogger = alertStatsLogger;
-    }
-
-    /**
-     * Trigger read ahead callback
-     */
-    private synchronized void invokeReadAheadCallback() {
-        if (null != readAheadCallback) {
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("Cache has space, schedule the read ahead");
-            }
-            readAheadCallback.resumeReadAhead();
-            readAheadCallback = null;
-        }
-    }
-
-    /**
-     * Register a readhead callback.
-     *
-     * @param readAheadCallback
-     *          read ahead callback
-     */
-    public synchronized void setReadAheadCallback(ReadAheadCallback 
readAheadCallback) {
-        this.readAheadCallback = readAheadCallback;
-        if (!isCacheFull()) {
-            invokeReadAheadCallback();
-        }
-    }
-
-    private void setLastException(IOException exc) {
-        lastException.set(exc);
-    }
-
-    /**
-     * Poll next entry from the readahead queue.
-     *
-     * @return next entry from readahead queue. null if no entries available 
in the queue.
-     * @throws IOException
-     */
-    public Entry.Reader getNextReadAheadEntry() throws IOException {
-        return getNextReadAheadEntry(0L, TimeUnit.MILLISECONDS);
-    }
-
-    public Entry.Reader getNextReadAheadEntry(long waitTime, TimeUnit 
waitTimeUnit) throws IOException {
-        if (null != lastException.get()) {
-            throw lastException.get();
-        }
-
-        Entry.Reader entry = null;
-        try {
-            entry = readAheadEntries.poll(waitTime, waitTimeUnit);
-        } catch (InterruptedException e) {
-            throw new DLInterruptedException("Interrupted on polling readahead 
entries : ", e);
-        }
-
-        if (null != entry) {
-            if (!isCacheFull()) {
-                invokeReadAheadCallback();
-            }
-        }
-
-        return entry;
-    }
-
-    /**
-     * Check whether the readahead becomes stall.
-     *
-     * @param idleReaderErrorThreshold
-     *          idle reader error threshold
-     * @param timeUnit
-     *          time unit of the idle reader error threshold
-     * @return true if the readahead becomes stall, otherwise false.
-     */
-    public boolean isReadAheadIdle(int idleReaderErrorThreshold, TimeUnit 
timeUnit) {
-        return (lastEntryProcessTime.elapsed(timeUnit) > 
idleReaderErrorThreshold);
-    }
-
-    /**
-     * Set an ledger entry to readahead cache
-     *
-     * @param key
-     *          read position of the entry
-     * @param entry
-     *          the ledger entry
-     * @param reason
-     *          the reason to add the entry to readahead (for logging)
-     * @param envelopeEntries
-     *          whether this entry an enveloped entries or not
-     * @param startSequenceId
-     *          the start sequence id
-     */
-    public void set(LedgerReadPosition key,
-                    LedgerEntry entry,
-                    String reason,
-                    boolean envelopeEntries,
-                    long startSequenceId) {
-        processNewLedgerEntry(key, entry, reason, envelopeEntries, 
startSequenceId);
-        lastEntryProcessTime.reset().start();
-        AsyncNotification n = notification;
-        if (null != n) {
-            n.notifyOnOperationComplete();
-        }
-    }
-
-    public boolean isCacheFull() {
-        return getNumCachedEntries() >= maxCachedEntries;
-    }
-
-    /**
-     * Return number cached records.
-     *
-     * @return number cached records.
-     */
-    public int getNumCachedEntries() {
-        return readAheadEntries.size();
-    }
-
-    /**
-     * Process the new ledger entry and propagate the records into readahead 
queue.
-     *
-     * @param readPosition
-     *          position of the ledger entry
-     * @param ledgerEntry
-     *          ledger entry
-     * @param reason
-     *          reason to add this ledger entry
-     * @param envelopeEntries
-     *          whether this entry is enveloped
-     * @param startSequenceId
-     *          the start sequence id of this log segment
-     */
-    private void processNewLedgerEntry(final LedgerReadPosition readPosition,
-                                       final LedgerEntry ledgerEntry,
-                                       final String reason,
-                                       boolean envelopeEntries,
-                                       long startSequenceId) {
-        try {
-            Entry.Reader reader = Entry.newBuilder()
-                    
.setLogSegmentInfo(readPosition.getLogSegmentSequenceNumber(), startSequenceId)
-                    .setEntryId(ledgerEntry.getEntryId())
-                    .setEnvelopeEntry(envelopeEntries)
-                    .deserializeRecordSet(deserializeRecordSet)
-                    .setInputStream(ledgerEntry.getEntryInputStream())
-                    .buildReader();
-            readAheadEntries.add(reader);
-        } catch (InvalidEnvelopedEntryException ieee) {
-            alertStatsLogger.raise("Found invalid enveloped entry on stream {} 
: ", streamName, ieee);
-            setLastException(ieee);
-        } catch (IOException exc) {
-            setLastException(exc);
-        }
-    }
-
-    public void clear() {
-        readAheadEntries.clear();
-    }
-
-    @Override
-    public String toString() {
-        return String.format("%s: Num Cached Entries: %d",
-            streamName, getNumCachedEntries());
-    }
-}

Reply via email to