http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogHandler.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogHandler.java 
b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogHandler.java
new file mode 100644
index 0000000..07ae0ff
--- /dev/null
+++ 
b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogHandler.java
@@ -0,0 +1,715 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import com.google.common.base.Stopwatch;
+import org.apache.distributedlog.callback.LogSegmentNamesListener;
+import org.apache.distributedlog.exceptions.LogEmptyException;
+import org.apache.distributedlog.exceptions.LogSegmentNotFoundException;
+import org.apache.distributedlog.exceptions.UnexpectedException;
+import org.apache.distributedlog.logsegment.LogSegmentEntryStore;
+import org.apache.distributedlog.metadata.LogMetadata;
+import org.apache.distributedlog.io.AsyncAbortable;
+import org.apache.distributedlog.io.AsyncCloseable;
+import org.apache.distributedlog.logsegment.LogSegmentMetadataCache;
+import org.apache.distributedlog.logsegment.PerStreamLogSegmentCache;
+import org.apache.distributedlog.logsegment.LogSegmentFilter;
+import org.apache.distributedlog.logsegment.LogSegmentMetadataStore;
+import org.apache.distributedlog.metadata.LogStreamMetadataStore;
+import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.util.OrderedScheduler;
+import com.twitter.util.Function;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import com.twitter.util.Promise;
+import org.apache.bookkeeper.stats.AlertStatsLogger;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Versioned;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * The base class about log handler on managing log segments.
+ *
+ * <h3>Metrics</h3>
+ * The log handler is a base class on managing log segments. so all the metrics
+ * here are related to log segments retrieval and exposed under `logsegments`.
+ * These metrics are all OpStats, in the format of 
<code>`scope`/logsegments/`op`</code>.
+ * <p>
+ * Those operations are:
+ * <ul>
+ * <li>get_inprogress_segment: time between the inprogress log segment created 
and
+ * the handler read it.
+ * <li>get_completed_segment: time between a log segment is turned to 
completed and
+ * the handler read it.
+ * <li>negative_get_inprogress_segment: record the negative values for 
`get_inprogress_segment`.
+ * <li>negative_get_completed_segment: record the negative values for 
`get_completed_segment`.
+ * <li>recover_last_entry: recovering last entry from a log segment
+ * <li>recover_scanned_entries: the number of entries that are scanned during 
recovering.
+ * </ul>
+ * @see BKLogWriteHandler
+ * @see BKLogReadHandler
+ */
+public abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable {
+    static final Logger LOG = LoggerFactory.getLogger(BKLogHandler.class);
+
+    protected final LogMetadata logMetadata;
+    protected final DistributedLogConfiguration conf;
+    protected final LogStreamMetadataStore streamMetadataStore;
+    protected final LogSegmentMetadataStore metadataStore;
+    protected final LogSegmentMetadataCache metadataCache;
+    protected final LogSegmentEntryStore entryStore;
+    protected final int firstNumEntriesPerReadLastRecordScan;
+    protected final int maxNumEntriesPerReadLastRecordScan;
+    protected volatile long lastLedgerRollingTimeMillis = -1;
+    protected final OrderedScheduler scheduler;
+    protected final StatsLogger statsLogger;
+    protected final AlertStatsLogger alertStatsLogger;
+    protected volatile boolean reportGetSegmentStats = false;
+    private final String lockClientId;
+    protected final AtomicReference<IOException> metadataException = new 
AtomicReference<IOException>(null);
+
+    // Maintain the list of log segments per stream
+    protected final PerStreamLogSegmentCache logSegmentCache;
+
+    // trace
+    protected final long metadataLatencyWarnThresholdMillis;
+
+    // Stats
+    private final OpStatsLogger getInprogressSegmentStat;
+    private final OpStatsLogger getCompletedSegmentStat;
+    private final OpStatsLogger negativeGetInprogressSegmentStat;
+    private final OpStatsLogger negativeGetCompletedSegmentStat;
+    private final OpStatsLogger recoverLastEntryStats;
+    private final OpStatsLogger recoverScannedEntriesStats;
+
+    /**
+     * Construct a Bookkeeper journal manager.
+     */
+    BKLogHandler(LogMetadata metadata,
+                 DistributedLogConfiguration conf,
+                 LogStreamMetadataStore streamMetadataStore,
+                 LogSegmentMetadataCache metadataCache,
+                 LogSegmentEntryStore entryStore,
+                 OrderedScheduler scheduler,
+                 StatsLogger statsLogger,
+                 AlertStatsLogger alertStatsLogger,
+                 String lockClientId) {
+        this.logMetadata = metadata;
+        this.conf = conf;
+        this.scheduler = scheduler;
+        this.statsLogger = statsLogger;
+        this.alertStatsLogger = alertStatsLogger;
+        this.logSegmentCache = new PerStreamLogSegmentCache(
+                metadata.getLogName(),
+                conf.isLogSegmentSequenceNumberValidationEnabled());
+        firstNumEntriesPerReadLastRecordScan = 
conf.getFirstNumEntriesPerReadLastRecordScan();
+        maxNumEntriesPerReadLastRecordScan = 
conf.getMaxNumEntriesPerReadLastRecordScan();
+        this.streamMetadataStore = streamMetadataStore;
+        this.metadataStore = streamMetadataStore.getLogSegmentMetadataStore();
+        this.metadataCache = metadataCache;
+        this.entryStore = entryStore;
+        this.lockClientId = lockClientId;
+
+        // Traces
+        this.metadataLatencyWarnThresholdMillis = 
conf.getMetadataLatencyWarnThresholdMillis();
+
+        // Stats
+        StatsLogger segmentsLogger = statsLogger.scope("logsegments");
+        getInprogressSegmentStat = 
segmentsLogger.getOpStatsLogger("get_inprogress_segment");
+        getCompletedSegmentStat = 
segmentsLogger.getOpStatsLogger("get_completed_segment");
+        negativeGetInprogressSegmentStat = 
segmentsLogger.getOpStatsLogger("negative_get_inprogress_segment");
+        negativeGetCompletedSegmentStat = 
segmentsLogger.getOpStatsLogger("negative_get_completed_segment");
+        recoverLastEntryStats = 
segmentsLogger.getOpStatsLogger("recover_last_entry");
+        recoverScannedEntriesStats = 
segmentsLogger.getOpStatsLogger("recover_scanned_entries");
+    }
+
+    BKLogHandler checkMetadataException() throws IOException {
+        if (null != metadataException.get()) {
+            throw metadataException.get();
+        }
+        return this;
+    }
+
+    public void reportGetSegmentStats(boolean enabled) {
+        this.reportGetSegmentStats = enabled;
+    }
+
+    public String getLockClientId() {
+        return lockClientId;
+    }
+
+    public Future<LogRecordWithDLSN> asyncGetFirstLogRecord() {
+        final Promise<LogRecordWithDLSN> promise = new 
Promise<LogRecordWithDLSN>();
+        streamMetadataStore.logExists(logMetadata.getUri(), 
logMetadata.getLogName())
+                .addEventListener(new FutureEventListener<Void>() {
+            @Override
+            public void onSuccess(Void value) {
+                readLogSegmentsFromStore(
+                        LogSegmentMetadata.COMPARATOR,
+                        LogSegmentFilter.DEFAULT_FILTER,
+                        null
+                ).addEventListener(new 
FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
+
+                    @Override
+                    public void onSuccess(Versioned<List<LogSegmentMetadata>> 
ledgerList) {
+                        if (ledgerList.getValue().isEmpty()) {
+                            promise.setException(new LogEmptyException("Log " 
+ getFullyQualifiedName() + " has no records"));
+                            return;
+                        }
+                        Future<LogRecordWithDLSN> firstRecord = null;
+                        for (LogSegmentMetadata ledger : 
ledgerList.getValue()) {
+                            if (!ledger.isTruncated() && 
(ledger.getRecordCount() > 0 || ledger.isInProgress())) {
+                                firstRecord = asyncReadFirstUserRecord(ledger, 
DLSN.InitialDLSN);
+                                break;
+                            }
+                        }
+                        if (null != firstRecord) {
+                            promise.become(firstRecord);
+                        } else {
+                            promise.setException(new LogEmptyException("Log " 
+ getFullyQualifiedName() + " has no records"));
+                        }
+                    }
+
+                    @Override
+                    public void onFailure(Throwable cause) {
+                        promise.setException(cause);
+                    }
+                });
+            }
+
+            @Override
+            public void onFailure(Throwable cause) {
+                promise.setException(cause);
+            }
+        });
+        return promise;
+    }
+
+    public Future<LogRecordWithDLSN> getLastLogRecordAsync(final boolean 
recover, final boolean includeEndOfStream) {
+        final Promise<LogRecordWithDLSN> promise = new 
Promise<LogRecordWithDLSN>();
+        streamMetadataStore.logExists(logMetadata.getUri(), 
logMetadata.getLogName())
+                .addEventListener(new FutureEventListener<Void>() {
+            @Override
+            public void onSuccess(Void value) {
+                readLogSegmentsFromStore(
+                        LogSegmentMetadata.DESC_COMPARATOR,
+                        LogSegmentFilter.DEFAULT_FILTER,
+                        null
+                ).addEventListener(new 
FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
+
+                    @Override
+                    public void onSuccess(Versioned<List<LogSegmentMetadata>> 
ledgerList) {
+                        if (ledgerList.getValue().isEmpty()) {
+                            promise.setException(
+                                    new LogEmptyException("Log " + 
getFullyQualifiedName() + " has no records"));
+                            return;
+                        }
+                        asyncGetLastLogRecord(
+                                ledgerList.getValue().iterator(),
+                                promise,
+                                recover,
+                                false,
+                                includeEndOfStream);
+                    }
+
+                    @Override
+                    public void onFailure(Throwable cause) {
+                        promise.setException(cause);
+                    }
+                });
+            }
+
+            @Override
+            public void onFailure(Throwable cause) {
+                promise.setException(cause);
+            }
+        });
+        return promise;
+    }
+
+    private void asyncGetLastLogRecord(final Iterator<LogSegmentMetadata> 
ledgerIter,
+                                       final Promise<LogRecordWithDLSN> 
promise,
+                                       final boolean fence,
+                                       final boolean includeControlRecord,
+                                       final boolean includeEndOfStream) {
+        if (ledgerIter.hasNext()) {
+            LogSegmentMetadata metadata = ledgerIter.next();
+            asyncReadLastRecord(metadata, fence, includeControlRecord, 
includeEndOfStream).addEventListener(
+                    new FutureEventListener<LogRecordWithDLSN>() {
+                        @Override
+                        public void onSuccess(LogRecordWithDLSN record) {
+                            if (null == record) {
+                                asyncGetLastLogRecord(ledgerIter, promise, 
fence, includeControlRecord, includeEndOfStream);
+                            } else {
+                                promise.setValue(record);
+                            }
+                        }
+
+                        @Override
+                        public void onFailure(Throwable cause) {
+                            promise.setException(cause);
+                        }
+                    }
+            );
+        } else {
+            promise.setException(new LogEmptyException("Log " + 
getFullyQualifiedName() + " has no records"));
+        }
+    }
+
+    private Future<LogRecordWithDLSN> 
asyncReadFirstUserRecord(LogSegmentMetadata ledger, DLSN beginDLSN) {
+        return ReadUtils.asyncReadFirstUserRecord(
+                getFullyQualifiedName(),
+                ledger,
+                firstNumEntriesPerReadLastRecordScan,
+                maxNumEntriesPerReadLastRecordScan,
+                new AtomicInteger(0),
+                scheduler,
+                entryStore,
+                beginDLSN
+        );
+    }
+
+    /**
+     * This is a helper method to compactly return the record count between 
two records, the first denoted by
+     * beginDLSN and the second denoted by endPosition. Its up to the caller 
to ensure that endPosition refers to
+     * position in the same ledger as beginDLSN.
+     */
+    private Future<Long> asyncGetLogRecordCount(LogSegmentMetadata ledger, 
final DLSN beginDLSN, final long endPosition) {
+        return asyncReadFirstUserRecord(ledger, beginDLSN).map(new 
Function<LogRecordWithDLSN, Long>() {
+            public Long apply(final LogRecordWithDLSN beginRecord) {
+                long recordCount = 0;
+                if (null != beginRecord) {
+                    recordCount = endPosition + 1 - 
beginRecord.getLastPositionWithinLogSegment();
+                }
+                return recordCount;
+            }
+        });
+    }
+
+    /**
+     * Ledger metadata tells us how many records are in each completed 
segment, but for the first and last segments
+     * we may have to crack open the entry and count. For the first entry, we 
need to do so because beginDLSN may be
+     * an interior entry. For the last entry, if it is inprogress, we need to 
recover it and find the last user
+     * entry.
+     */
+    private Future<Long> asyncGetLogRecordCount(final LogSegmentMetadata 
ledger, final DLSN beginDLSN) {
+        if (ledger.isInProgress() && ledger.isDLSNinThisSegment(beginDLSN)) {
+            return asyncReadLastUserRecord(ledger).flatMap(new 
Function<LogRecordWithDLSN, Future<Long>>() {
+                public Future<Long> apply(final LogRecordWithDLSN endRecord) {
+                    if (null != endRecord) {
+                        return asyncGetLogRecordCount(ledger, beginDLSN, 
endRecord.getLastPositionWithinLogSegment() /* end position */);
+                    } else {
+                        return Future.value((long) 0);
+                    }
+                }
+            });
+        } else if (ledger.isInProgress()) {
+            return asyncReadLastUserRecord(ledger).map(new 
Function<LogRecordWithDLSN, Long>() {
+                public Long apply(final LogRecordWithDLSN endRecord) {
+                    if (null != endRecord) {
+                        return (long) 
endRecord.getLastPositionWithinLogSegment();
+                    } else {
+                        return (long) 0;
+                    }
+                }
+            });
+        } else if (ledger.isDLSNinThisSegment(beginDLSN)) {
+            return asyncGetLogRecordCount(ledger, beginDLSN, 
ledger.getRecordCount() /* end position */);
+        } else {
+            return Future.value((long) ledger.getRecordCount());
+        }
+    }
+
+    /**
+     * Get a count of records between beginDLSN and the end of the stream.
+     *
+     * @param beginDLSN dlsn marking the start of the range
+     * @return the count of records present in the range
+     */
+    public Future<Long> asyncGetLogRecordCount(final DLSN beginDLSN) {
+        return streamMetadataStore.logExists(logMetadata.getUri(), 
logMetadata.getLogName())
+                .flatMap(new Function<Void, Future<Long>>() {
+            public Future<Long> apply(Void done) {
+
+                return readLogSegmentsFromStore(
+                        LogSegmentMetadata.COMPARATOR,
+                        LogSegmentFilter.DEFAULT_FILTER,
+                        null
+                ).flatMap(new Function<Versioned<List<LogSegmentMetadata>>, 
Future<Long>>() {
+                    public Future<Long> 
apply(Versioned<List<LogSegmentMetadata>> ledgerList) {
+
+                        List<Future<Long>> futureCounts = new 
ArrayList<Future<Long>>(ledgerList.getValue().size());
+                        for (LogSegmentMetadata ledger : 
ledgerList.getValue()) {
+                            if (ledger.getLogSegmentSequenceNumber() >= 
beginDLSN.getLogSegmentSequenceNo()) {
+                                
futureCounts.add(asyncGetLogRecordCount(ledger, beginDLSN));
+                            }
+                        }
+                        return Future.collect(futureCounts).map(new 
Function<List<Long>, Long>() {
+                            public Long apply(List<Long> counts) {
+                                return sum(counts);
+                            }
+                        });
+                    }
+                });
+            }
+        });
+    }
+
+    private Long sum(List<Long> values) {
+        long sum = 0;
+        for (Long value : values) {
+            sum += value;
+        }
+        return sum;
+    }
+
+    @Override
+    public Future<Void> asyncAbort() {
+        return asyncClose();
+    }
+
+    public Future<LogRecordWithDLSN> asyncReadLastUserRecord(final 
LogSegmentMetadata l) {
+        return asyncReadLastRecord(l, false, false, false);
+    }
+
+    public Future<LogRecordWithDLSN> asyncReadLastRecord(final 
LogSegmentMetadata l,
+                                                         final boolean fence,
+                                                         final boolean 
includeControl,
+                                                         final boolean 
includeEndOfStream) {
+        final AtomicInteger numRecordsScanned = new AtomicInteger(0);
+        final Stopwatch stopwatch = Stopwatch.createStarted();
+        return ReadUtils.asyncReadLastRecord(
+                getFullyQualifiedName(),
+                l,
+                fence,
+                includeControl,
+                includeEndOfStream,
+                firstNumEntriesPerReadLastRecordScan,
+                maxNumEntriesPerReadLastRecordScan,
+                numRecordsScanned,
+                scheduler,
+                entryStore
+        ).addEventListener(new FutureEventListener<LogRecordWithDLSN>() {
+            @Override
+            public void onSuccess(LogRecordWithDLSN value) {
+                
recoverLastEntryStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+                
recoverScannedEntriesStats.registerSuccessfulEvent(numRecordsScanned.get());
+            }
+
+            @Override
+            public void onFailure(Throwable cause) {
+                
recoverLastEntryStats.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+            }
+        });
+    }
+
+    protected void setLastLedgerRollingTimeMillis(long rollingTimeMillis) {
+        if (lastLedgerRollingTimeMillis < rollingTimeMillis) {
+            lastLedgerRollingTimeMillis = rollingTimeMillis;
+        }
+    }
+
+    public String getFullyQualifiedName() {
+        return logMetadata.getFullyQualifiedName();
+    }
+
+    // Log Segments Related Functions
+    //
+    // ***Note***
+    // Get log segment list should go through #getCachedLogSegments as we need 
to assign start sequence id
+    // for inprogress log segment so the reader could generate the right 
sequence id.
+    //
+    // ***PerStreamCache vs LogSegmentMetadataCache **
+    // The per stream cache maintains the list of segments per stream, while 
the metadata cache
+    // maintains log segments. The metadata cache is just to reduce the access 
to zookeeper, it is
+    // okay that some of the log segments are not in the cache; however the 
per stream cache can not
+    // have any gaps between log segment sequence numbers which it has to be 
accurate.
+
+    /**
+     * Get the cached log segments.
+     *
+     * @param comparator the comparator to sort the returned log segments.
+     * @return list of sorted log segments
+     * @throws UnexpectedException if unexpected condition detected.
+     */
+    protected List<LogSegmentMetadata> 
getCachedLogSegments(Comparator<LogSegmentMetadata> comparator)
+        throws UnexpectedException {
+        try {
+            return logSegmentCache.getLogSegments(comparator);
+        } catch (UnexpectedException ue) {
+            // the log segments cache went wrong
+            LOG.error("Unexpected exception on getting log segments from the 
cache for stream {}",
+                    getFullyQualifiedName(), ue);
+            metadataException.compareAndSet(null, ue);
+            throw ue;
+        }
+    }
+
+    /**
+     * Add the segment <i>metadata</i> for <i>name</i> in the cache.
+     *
+     * @param name
+     *          segment znode name.
+     * @param metadata
+     *          segment metadata.
+     */
+    protected void addLogSegmentToCache(String name, LogSegmentMetadata 
metadata) {
+        metadataCache.put(metadata.getZkPath(), metadata);
+        logSegmentCache.add(name, metadata);
+        // update the last ledger rolling time
+        if (!metadata.isInProgress() && (lastLedgerRollingTimeMillis < 
metadata.getCompletionTime())) {
+            lastLedgerRollingTimeMillis = metadata.getCompletionTime();
+        }
+
+        if (reportGetSegmentStats) {
+            // update stats
+            long ts = System.currentTimeMillis();
+            if (metadata.isInProgress()) {
+                // as we used timestamp as start tx id we could take it as 
start time
+                // NOTE: it is a hack here.
+                long elapsedMillis = ts - metadata.getFirstTxId();
+                long elapsedMicroSec = 
TimeUnit.MILLISECONDS.toMicros(elapsedMillis);
+                if (elapsedMicroSec > 0) {
+                    if (elapsedMillis > metadataLatencyWarnThresholdMillis) {
+                        LOG.warn("{} received inprogress log segment in {} 
millis: {}",
+                                 new Object[] { getFullyQualifiedName(), 
elapsedMillis, metadata });
+                    }
+                    
getInprogressSegmentStat.registerSuccessfulEvent(elapsedMicroSec);
+                } else {
+                    
negativeGetInprogressSegmentStat.registerSuccessfulEvent(-elapsedMicroSec);
+                }
+            } else {
+                long elapsedMillis = ts - metadata.getCompletionTime();
+                long elapsedMicroSec = 
TimeUnit.MILLISECONDS.toMicros(elapsedMillis);
+                if (elapsedMicroSec > 0) {
+                    if (elapsedMillis > metadataLatencyWarnThresholdMillis) {
+                        LOG.warn("{} received completed log segment in {} 
millis : {}",
+                                 new Object[] { getFullyQualifiedName(), 
elapsedMillis, metadata });
+                    }
+                    
getCompletedSegmentStat.registerSuccessfulEvent(elapsedMicroSec);
+                } else {
+                    
negativeGetCompletedSegmentStat.registerSuccessfulEvent(-elapsedMicroSec);
+                }
+            }
+        }
+    }
+
+    /**
+     * Read log segment <i>name</i> from the cache.
+     *
+     * @param name name of the log segment
+     * @return log segment metadata
+     */
+    protected LogSegmentMetadata readLogSegmentFromCache(String name) {
+        return logSegmentCache.get(name);
+    }
+
+    /**
+     * Remove the log segment <i>name</i> from the cache.
+     *
+     * @param name name of the log segment.
+     * @return log segment metadata
+     */
+    protected LogSegmentMetadata removeLogSegmentFromCache(String name) {
+        metadataCache.invalidate(name);
+        return logSegmentCache.remove(name);
+    }
+
+    /**
+     * Update the log segment cache with updated mapping
+     *
+     * @param logSegmentsRemoved log segments removed
+     * @param logSegmentsAdded log segments added
+     */
+    protected void updateLogSegmentCache(Set<String> logSegmentsRemoved,
+                                         Map<String, LogSegmentMetadata> 
logSegmentsAdded) {
+        for (String segmentName : logSegmentsRemoved) {
+            metadataCache.invalidate(segmentName);
+        }
+        for (Map.Entry<String, LogSegmentMetadata> entry : 
logSegmentsAdded.entrySet()) {
+            metadataCache.put(entry.getKey(), entry.getValue());
+        }
+        logSegmentCache.update(logSegmentsRemoved, logSegmentsAdded);
+    }
+
+    /**
+     * Read the log segments from the store and register a listener
+     * @param comparator
+     * @param segmentFilter
+     * @param logSegmentNamesListener
+     * @return future represents the result of log segments
+     */
+    public Future<Versioned<List<LogSegmentMetadata>>> 
readLogSegmentsFromStore(
+            final Comparator<LogSegmentMetadata> comparator,
+            final LogSegmentFilter segmentFilter,
+            final LogSegmentNamesListener logSegmentNamesListener) {
+        final Promise<Versioned<List<LogSegmentMetadata>>> readResult =
+                new Promise<Versioned<List<LogSegmentMetadata>>>();
+        metadataStore.getLogSegmentNames(logMetadata.getLogSegmentsPath(), 
logSegmentNamesListener)
+                .addEventListener(new 
FutureEventListener<Versioned<List<String>>>() {
+                    @Override
+                    public void onFailure(Throwable cause) {
+                        FutureUtils.setException(readResult, cause);
+                    }
+
+                    @Override
+                    public void onSuccess(Versioned<List<String>> 
logSegmentNames) {
+                        readLogSegmentsFromStore(logSegmentNames, comparator, 
segmentFilter, readResult);
+                    }
+                });
+        return readResult;
+    }
+
+    protected void readLogSegmentsFromStore(final Versioned<List<String>> 
logSegmentNames,
+                                            final 
Comparator<LogSegmentMetadata> comparator,
+                                            final LogSegmentFilter 
segmentFilter,
+                                            final 
Promise<Versioned<List<LogSegmentMetadata>>> readResult) {
+        Set<String> segmentsReceived = new HashSet<String>();
+        
segmentsReceived.addAll(segmentFilter.filter(logSegmentNames.getValue()));
+        Set<String> segmentsAdded;
+        final Set<String> removedSegments = Collections.synchronizedSet(new 
HashSet<String>());
+        final Map<String, LogSegmentMetadata> addedSegments =
+                Collections.synchronizedMap(new HashMap<String, 
LogSegmentMetadata>());
+        Pair<Set<String>, Set<String>> segmentChanges = 
logSegmentCache.diff(segmentsReceived);
+        segmentsAdded = segmentChanges.getLeft();
+        removedSegments.addAll(segmentChanges.getRight());
+
+        if (segmentsAdded.isEmpty()) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("No segments added for {}.", 
getFullyQualifiedName());
+            }
+
+            // update the cache before #getCachedLogSegments to return
+            updateLogSegmentCache(removedSegments, addedSegments);
+
+            List<LogSegmentMetadata> segmentList;
+            try {
+                segmentList = getCachedLogSegments(comparator);
+            } catch (UnexpectedException e) {
+                FutureUtils.setException(readResult, e);
+                return;
+            }
+
+            FutureUtils.setValue(readResult,
+                    new Versioned<List<LogSegmentMetadata>>(segmentList, 
logSegmentNames.getVersion()));
+            return;
+        }
+
+        final AtomicInteger numChildren = new 
AtomicInteger(segmentsAdded.size());
+        final AtomicInteger numFailures = new AtomicInteger(0);
+        for (final String segment: segmentsAdded) {
+            String logSegmentPath = logMetadata.getLogSegmentPath(segment);
+            LogSegmentMetadata cachedSegment = 
metadataCache.get(logSegmentPath);
+            if (null != cachedSegment) {
+                addedSegments.put(segment, cachedSegment);
+                completeReadLogSegmentsFromStore(
+                        removedSegments,
+                        addedSegments,
+                        comparator,
+                        readResult,
+                        logSegmentNames.getVersion(),
+                        numChildren,
+                        numFailures);
+                continue;
+            }
+            metadataStore.getLogSegment(logSegmentPath)
+                    .addEventListener(new 
FutureEventListener<LogSegmentMetadata>() {
+
+                        @Override
+                        public void onSuccess(LogSegmentMetadata result) {
+                            addedSegments.put(segment, result);
+                            complete();
+                        }
+
+                        @Override
+                        public void onFailure(Throwable cause) {
+                            // LogSegmentNotFoundException exception is 
possible in two cases
+                            // 1. A log segment was deleted by truncation 
between the call to getChildren and read
+                            // attempt on the znode corresponding to the 
segment
+                            // 2. In progress segment has been completed => 
inprogress ZNode does not exist
+                            if (cause instanceof LogSegmentNotFoundException) {
+                                removedSegments.add(segment);
+                                complete();
+                            } else {
+                                // fail fast
+                                if (1 == numFailures.incrementAndGet()) {
+                                    FutureUtils.setException(readResult, 
cause);
+                                    return;
+                                }
+                            }
+                        }
+
+                        private void complete() {
+                            completeReadLogSegmentsFromStore(
+                                    removedSegments,
+                                    addedSegments,
+                                    comparator,
+                                    readResult,
+                                    logSegmentNames.getVersion(),
+                                    numChildren,
+                                    numFailures);
+                        }
+                    });
+        }
+    }
+
+    private void completeReadLogSegmentsFromStore(final Set<String> 
removedSegments,
+                                                  final Map<String, 
LogSegmentMetadata> addedSegments,
+                                                  final 
Comparator<LogSegmentMetadata> comparator,
+                                                  final 
Promise<Versioned<List<LogSegmentMetadata>>> readResult,
+                                                  final Version 
logSegmentNamesVersion,
+                                                  final AtomicInteger 
numChildren,
+                                                  final AtomicInteger 
numFailures) {
+        if (0 != numChildren.decrementAndGet()) {
+            return;
+        }
+        if (numFailures.get() > 0) {
+            return;
+        }
+        // update the cache only when fetch completed and before 
#getCachedLogSegments
+        updateLogSegmentCache(removedSegments, addedSegments);
+        List<LogSegmentMetadata> segmentList;
+        try {
+            segmentList = getCachedLogSegments(comparator);
+        } catch (UnexpectedException e) {
+            FutureUtils.setException(readResult, e);
+            return;
+        }
+        FutureUtils.setValue(readResult,
+            new Versioned<List<LogSegmentMetadata>>(segmentList, 
logSegmentNamesVersion));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogReadHandler.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogReadHandler.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogReadHandler.java
new file mode 100644
index 0000000..c6e2e07
--- /dev/null
+++ 
b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogReadHandler.java
@@ -0,0 +1,431 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import org.apache.distributedlog.callback.LogSegmentListener;
+import org.apache.distributedlog.callback.LogSegmentNamesListener;
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
+import org.apache.distributedlog.exceptions.DLIllegalStateException;
+import org.apache.distributedlog.exceptions.LockingException;
+import org.apache.distributedlog.exceptions.LogNotFoundException;
+import org.apache.distributedlog.exceptions.LogSegmentNotFoundException;
+import org.apache.distributedlog.exceptions.UnexpectedException;
+import org.apache.distributedlog.logsegment.LogSegmentEntryStore;
+import org.apache.distributedlog.metadata.LogMetadataForReader;
+import org.apache.distributedlog.lock.DistributedLock;
+import org.apache.distributedlog.logsegment.LogSegmentFilter;
+import org.apache.distributedlog.logsegment.LogSegmentMetadataCache;
+import org.apache.distributedlog.metadata.LogStreamMetadataStore;
+import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.util.OrderedScheduler;
+import org.apache.distributedlog.util.Utils;
+import com.twitter.util.ExceptionalFunction;
+import com.twitter.util.Function;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import com.twitter.util.Promise;
+import com.twitter.util.Return;
+import com.twitter.util.Throw;
+import com.twitter.util.Try;
+import org.apache.bookkeeper.stats.AlertStatsLogger;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.SafeRunnable;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Versioned;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.AbstractFunction1;
+import scala.runtime.BoxedUnit;
+
+import javax.annotation.Nullable;
+
+/**
+ * Log Handler for Readers.
+ * <h3>Metrics</h3>
+ *
+ * <h4>ReadAhead Worker</h4>
+ * Most of readahead stats are exposed under scope `readahead_worker`. Only 
readahead exceptions are exposed
+ * in parent scope via <code>readAheadExceptionsLogger</code>.
+ * <ul>
+ * <li> `readahead_worker`/wait: counter. number of waits that readahead 
worker is waiting. If this keeps increasing,
+ * it usually means readahead keep getting full because of reader slows down 
reading.
+ * <li> `readahead_worker`/repositions: counter. number of repositions that 
readhead worker encounters. reposition
+ * means that a readahead worker finds that it isn't advancing to a new log 
segment and force re-positioning.
+ * <li> `readahead_worker`/entry_piggy_back_hits: counter. it increases when 
the last add confirmed being advanced
+ * because of the piggy-back lac.
+ * <li> `readahead_worker`/entry_piggy_back_misses: counter. it increases when 
the last add confirmed isn't advanced
+ * by a read entry because it doesn't piggy back a newer lac.
+ * <li> `readahead_worker`/read_entries: opstats. stats on number of entries 
read per readahead read batch.
+ * <li> `readahead_worker`/read_lac_counter: counter. stats on the number of 
readLastConfirmed operations
+ * <li> `readahead_worker`/read_lac_and_entry_counter: counter. stats on the 
number of readLastConfirmedAndEntry
+ * operations.
+ * <li> `readahead_worker`/cache_full: counter. it increases each time 
readahead worker finds cache become full.
+ * If it keeps increasing, that means reader slows down reading.
+ * <li> `readahead_worker`/resume: opstats. stats on readahead worker resuming 
reading from wait state.
+ * <li> `readahead_worker`/read_lac_lag: opstats. stats on the number of 
entries diff between the lac reader knew
+ * last time and the lac that it received. if `lag` between two subsequent 
lacs is high, that might means delay
+ * might be high. because reader is only allowed to read entries after lac is 
advanced.
+ * <li> `readahead_worker`/long_poll_interruption: opstats. stats on the 
number of interruptions happened to long
+ * poll. the interruptions are usually because of receiving zookeeper 
notifications.
+ * <li> `readahead_worker`/notification_execution: opstats. stats on 
executions over the notifications received from
+ * zookeeper.
+ * <li> `readahead_worker`/metadata_reinitialization: opstats. stats on 
metadata reinitialization after receiving
+ * notifcation from log segments updates.
+ * <li> `readahead_worker`/idle_reader_warn: counter. it increases each time 
the readahead worker detects itself
+ * becoming idle.
+ * </ul>
+ * <h4>Read Lock</h4>
+ * All read lock related stats are exposed under scope `read_lock`.
+ * for detail stats.
+ */
+class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener 
{
+    static final Logger LOG = LoggerFactory.getLogger(BKLogReadHandler.class);
+
+    protected final LogMetadataForReader logMetadataForReader;
+
+    protected final DynamicDistributedLogConfiguration dynConf;
+
+    private final Optional<String> subscriberId;
+    private DistributedLock readLock;
+    private Future<Void> lockAcquireFuture;
+
+    // notify the state change about the read handler
+    protected final AsyncNotification readerStateNotification;
+
+    // log segments listener
+    protected boolean logSegmentsNotificationDisabled = false;
+    protected final CopyOnWriteArraySet<LogSegmentListener> listeners =
+            new CopyOnWriteArraySet<LogSegmentListener>();
+    protected Versioned<List<LogSegmentMetadata>> lastNotifiedLogSegments =
+            new Versioned<List<LogSegmentMetadata>>(null, Version.NEW);
+
+    // stats
+    private final StatsLogger perLogStatsLogger;
+
+    /**
+     * Construct a Bookkeeper journal manager.
+     */
+    BKLogReadHandler(LogMetadataForReader logMetadata,
+                     Optional<String> subscriberId,
+                     DistributedLogConfiguration conf,
+                     DynamicDistributedLogConfiguration dynConf,
+                     LogStreamMetadataStore streamMetadataStore,
+                     LogSegmentMetadataCache metadataCache,
+                     LogSegmentEntryStore entryStore,
+                     OrderedScheduler scheduler,
+                     AlertStatsLogger alertStatsLogger,
+                     StatsLogger statsLogger,
+                     StatsLogger perLogStatsLogger,
+                     String clientId,
+                     AsyncNotification readerStateNotification,
+                     boolean isHandleForReading) {
+        super(logMetadata,
+                conf,
+                streamMetadataStore,
+                metadataCache,
+                entryStore,
+                scheduler,
+                statsLogger,
+                alertStatsLogger,
+                clientId);
+        this.logMetadataForReader = logMetadata;
+        this.dynConf = dynConf;
+        this.perLogStatsLogger =
+                isHandleForReading ? perLogStatsLogger : 
NullStatsLogger.INSTANCE;
+        this.readerStateNotification = readerStateNotification;
+        this.subscriberId = subscriberId;
+    }
+
+    @VisibleForTesting
+    String getReadLockPath() {
+        return logMetadataForReader.getReadLockPath(subscriberId);
+    }
+
+    <T> void satisfyPromiseAsync(final Promise<T> promise, final Try<T> 
result) {
+        scheduler.submit(new SafeRunnable() {
+            @Override
+            public void safeRun() {
+                promise.update(result);
+            }
+        });
+    }
+
+    Future<Void> checkLogStreamExists() {
+        return streamMetadataStore.logExists(logMetadata.getUri(), 
logMetadata.getLogName());
+    }
+
+    /**
+     * Elective stream lock--readers are not required to acquire the lock 
before using the stream.
+     */
+    synchronized Future<Void> lockStream() {
+        if (null == lockAcquireFuture) {
+            lockAcquireFuture = 
streamMetadataStore.createReadLock(logMetadataForReader, subscriberId)
+                    .flatMap(new ExceptionalFunction<DistributedLock, 
Future<Void>>() {
+                        @Override
+                        public Future<Void> applyE(DistributedLock lock) 
throws Throwable {
+                            BKLogReadHandler.this.readLock = lock;
+                            LOG.info("acquiring readlock {} at {}", 
getLockClientId(), getReadLockPath());
+                            return acquireLockOnExecutorThread(lock);
+                        }
+                    });
+        }
+        return lockAcquireFuture;
+    }
+
+    /**
+     * Begin asynchronous lock acquire, but ensure that the returned future is 
satisfied on an
+     * executor service thread.
+     */
+    Future<Void> acquireLockOnExecutorThread(DistributedLock lock) throws 
LockingException {
+        final Future<? extends DistributedLock> acquireFuture = 
lock.asyncAcquire();
+
+        // The future we return must be satisfied on an executor service 
thread. If we simply
+        // return the future returned by asyncAcquire, user callbacks may end 
up running in
+        // the lock state executor thread, which will cause deadlocks and 
introduce latency
+        // etc.
+        final Promise<Void> threadAcquirePromise = new Promise<Void>();
+        threadAcquirePromise.setInterruptHandler(new Function<Throwable, 
BoxedUnit>() {
+            @Override
+            public BoxedUnit apply(Throwable t) {
+                FutureUtils.cancel(acquireFuture);
+                return null;
+            }
+        });
+        acquireFuture.addEventListener(new 
FutureEventListener<DistributedLock>() {
+            @Override
+            public void onSuccess(DistributedLock lock) {
+                LOG.info("acquired readlock {} at {}", getLockClientId(), 
getReadLockPath());
+                satisfyPromiseAsync(threadAcquirePromise, new 
Return<Void>(null));
+            }
+
+            @Override
+            public void onFailure(Throwable cause) {
+                LOG.info("failed to acquire readlock {} at {}",
+                        new Object[]{ getLockClientId(), getReadLockPath(), 
cause });
+                satisfyPromiseAsync(threadAcquirePromise, new 
Throw<Void>(cause));
+            }
+        });
+        return threadAcquirePromise;
+    }
+
+    /**
+     * Check ownership of elective stream lock.
+     */
+    void checkReadLock() throws DLIllegalStateException, LockingException {
+        synchronized (this) {
+            if ((null == lockAcquireFuture) ||
+                (!lockAcquireFuture.isDefined())) {
+                throw new DLIllegalStateException("Attempt to check for lock 
before it has been acquired successfully");
+            }
+        }
+
+        readLock.checkOwnership();
+    }
+
+    public Future<Void> asyncClose() {
+        DistributedLock lockToClose;
+        synchronized (this) {
+            if (null != lockAcquireFuture && !lockAcquireFuture.isDefined()) {
+                FutureUtils.cancel(lockAcquireFuture);
+            }
+            lockToClose = readLock;
+        }
+        return Utils.closeSequence(scheduler, lockToClose)
+                .flatMap(new AbstractFunction1<Void, Future<Void>>() {
+            @Override
+            public Future<Void> apply(Void result) {
+                // unregister the log segment listener
+                
metadataStore.unregisterLogSegmentListener(logMetadata.getLogSegmentsPath(), 
BKLogReadHandler.this);
+                return Future.Void();
+            }
+        });
+    }
+
+    @Override
+    public Future<Void> asyncAbort() {
+        return asyncClose();
+    }
+
+    /**
+     * Start fetch the log segments and register the {@link 
LogSegmentNamesListener}.
+     * The future is satisfied only on a successful fetch or encountered a 
fatal failure.
+     *
+     * @return future represents the fetch result
+     */
+    Future<Versioned<List<LogSegmentMetadata>>> asyncStartFetchLogSegments() {
+        Promise<Versioned<List<LogSegmentMetadata>>> promise =
+                new Promise<Versioned<List<LogSegmentMetadata>>>();
+        asyncStartFetchLogSegments(promise);
+        return promise;
+    }
+
+    void asyncStartFetchLogSegments(final 
Promise<Versioned<List<LogSegmentMetadata>>> promise) {
+        readLogSegmentsFromStore(
+                LogSegmentMetadata.COMPARATOR,
+                LogSegmentFilter.DEFAULT_FILTER,
+                this).addEventListener(new 
FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
+            @Override
+            public void onFailure(Throwable cause) {
+                if (cause instanceof LogNotFoundException ||
+                        cause instanceof LogSegmentNotFoundException ||
+                        cause instanceof UnexpectedException) {
+                    // indicate some inconsistent behavior, abort
+                    metadataException.compareAndSet(null, (IOException) cause);
+                    // notify the reader that read handler is in error state
+                    notifyReaderOnError(cause);
+                    FutureUtils.setException(promise, cause);
+                    return;
+                }
+                scheduler.schedule(new Runnable() {
+                    @Override
+                    public void run() {
+                        asyncStartFetchLogSegments(promise);
+                    }
+                }, conf.getZKRetryBackoffMaxMillis(), TimeUnit.MILLISECONDS);
+            }
+
+            @Override
+            public void onSuccess(Versioned<List<LogSegmentMetadata>> 
segments) {
+                // no-op
+                FutureUtils.setValue(promise, segments);
+            }
+        });
+    }
+
+    @VisibleForTesting
+    void disableReadAheadLogSegmentsNotification() {
+        logSegmentsNotificationDisabled = true;
+    }
+
+    @Override
+    public void onSegmentsUpdated(final Versioned<List<String>> segments) {
+        synchronized (this) {
+            if (lastNotifiedLogSegments.getVersion() != Version.NEW &&
+                    
lastNotifiedLogSegments.getVersion().compare(segments.getVersion()) != 
Version.Occurred.BEFORE) {
+                // the log segments has been read, and it is possibly a retry 
from last segments update
+                return;
+            }
+        }
+
+        Promise<Versioned<List<LogSegmentMetadata>>> readLogSegmentsPromise =
+                new Promise<Versioned<List<LogSegmentMetadata>>>();
+        readLogSegmentsPromise.addEventListener(new 
FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
+            @Override
+            public void onFailure(Throwable cause) {
+                if (cause instanceof LogNotFoundException ||
+                        cause instanceof LogSegmentNotFoundException ||
+                        cause instanceof UnexpectedException) {
+                    // indicate some inconsistent behavior, abort
+                    metadataException.compareAndSet(null, (IOException) cause);
+                    // notify the reader that read handler is in error state
+                    notifyReaderOnError(cause);
+                    return;
+                }
+                scheduler.schedule(new Runnable() {
+                    @Override
+                    public void run() {
+                        onSegmentsUpdated(segments);
+                    }
+                }, conf.getZKRetryBackoffMaxMillis(), TimeUnit.MILLISECONDS);
+            }
+
+            @Override
+            public void onSuccess(Versioned<List<LogSegmentMetadata>> 
logSegments) {
+                List<LogSegmentMetadata> segmentsToNotify = null;
+                synchronized (BKLogReadHandler.this) {
+                    Versioned<List<LogSegmentMetadata>> lastLogSegments = 
lastNotifiedLogSegments;
+                    if (lastLogSegments.getVersion() == Version.NEW ||
+                            
lastLogSegments.getVersion().compare(logSegments.getVersion()) == 
Version.Occurred.BEFORE) {
+                        lastNotifiedLogSegments = logSegments;
+                        segmentsToNotify = logSegments.getValue();
+                    }
+                }
+                if (null != segmentsToNotify) {
+                    notifyUpdatedLogSegments(segmentsToNotify);
+                }
+            }
+        });
+        // log segments list is updated, read their metadata
+        readLogSegmentsFromStore(
+                segments,
+                LogSegmentMetadata.COMPARATOR,
+                LogSegmentFilter.DEFAULT_FILTER,
+                readLogSegmentsPromise);
+    }
+
+    @Override
+    public void onLogStreamDeleted() {
+        notifyLogStreamDeleted();
+    }
+
+    //
+    // Listener for log segments
+    //
+
+    protected void registerListener(@Nullable LogSegmentListener listener) {
+        if (null != listener) {
+            listeners.add(listener);
+        }
+    }
+
+    protected void unregisterListener(@Nullable LogSegmentListener listener) {
+        if (null != listener) {
+            listeners.remove(listener);
+        }
+    }
+
+    protected void notifyUpdatedLogSegments(List<LogSegmentMetadata> segments) 
{
+        if (logSegmentsNotificationDisabled) {
+            return;
+        }
+
+        for (LogSegmentListener listener : listeners) {
+            List<LogSegmentMetadata> listToReturn =
+                    new ArrayList<LogSegmentMetadata>(segments);
+            Collections.sort(listToReturn, LogSegmentMetadata.COMPARATOR);
+            listener.onSegmentsUpdated(listToReturn);
+        }
+    }
+
+    protected void notifyLogStreamDeleted() {
+        if (logSegmentsNotificationDisabled) {
+            return;
+        }
+
+        for (LogSegmentListener listener : listeners) {
+            listener.onLogStreamDeleted();
+        }
+    }
+
+    // notify the errors
+    protected void notifyReaderOnError(Throwable cause) {
+        if (null != readerStateNotification) {
+            readerStateNotification.notifyOnError(cause);
+        }
+    }
+}

Reply via email to