http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
deleted file mode 100644
index 0cf8ed5..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
+++ /dev/null
@@ -1,715 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import com.google.common.base.Stopwatch;
-import com.twitter.distributedlog.callback.LogSegmentNamesListener;
-import com.twitter.distributedlog.exceptions.LogEmptyException;
-import com.twitter.distributedlog.exceptions.LogSegmentNotFoundException;
-import com.twitter.distributedlog.exceptions.UnexpectedException;
-import com.twitter.distributedlog.logsegment.LogSegmentEntryStore;
-import com.twitter.distributedlog.metadata.LogMetadata;
-import com.twitter.distributedlog.io.AsyncAbortable;
-import com.twitter.distributedlog.io.AsyncCloseable;
-import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache;
-import com.twitter.distributedlog.logsegment.PerStreamLogSegmentCache;
-import com.twitter.distributedlog.logsegment.LogSegmentFilter;
-import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
-import com.twitter.distributedlog.metadata.LogStreamMetadataStore;
-import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.distributedlog.util.OrderedScheduler;
-import com.twitter.util.Function;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import org.apache.bookkeeper.stats.AlertStatsLogger;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.versioning.Version;
-import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.commons.lang3.tuple.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * The base class about log handler on managing log segments.
- *
- * <h3>Metrics</h3>
- * The log handler is a base class on managing log segments. so all the metrics
- * here are related to log segments retrieval and exposed under `logsegments`.
- * These metrics are all OpStats, in the format of 
<code>`scope`/logsegments/`op`</code>.
- * <p>
- * Those operations are:
- * <ul>
- * <li>get_inprogress_segment: time between the inprogress log segment created 
and
- * the handler read it.
- * <li>get_completed_segment: time between a log segment is turned to 
completed and
- * the handler read it.
- * <li>negative_get_inprogress_segment: record the negative values for 
`get_inprogress_segment`.
- * <li>negative_get_completed_segment: record the negative values for 
`get_completed_segment`.
- * <li>recover_last_entry: recovering last entry from a log segment
- * <li>recover_scanned_entries: the number of entries that are scanned during 
recovering.
- * </ul>
- * @see BKLogWriteHandler
- * @see BKLogReadHandler
- */
-public abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable {
-    static final Logger LOG = LoggerFactory.getLogger(BKLogHandler.class);
-
-    protected final LogMetadata logMetadata;
-    protected final DistributedLogConfiguration conf;
-    protected final LogStreamMetadataStore streamMetadataStore;
-    protected final LogSegmentMetadataStore metadataStore;
-    protected final LogSegmentMetadataCache metadataCache;
-    protected final LogSegmentEntryStore entryStore;
-    protected final int firstNumEntriesPerReadLastRecordScan;
-    protected final int maxNumEntriesPerReadLastRecordScan;
-    protected volatile long lastLedgerRollingTimeMillis = -1;
-    protected final OrderedScheduler scheduler;
-    protected final StatsLogger statsLogger;
-    protected final AlertStatsLogger alertStatsLogger;
-    protected volatile boolean reportGetSegmentStats = false;
-    private final String lockClientId;
-    protected final AtomicReference<IOException> metadataException = new 
AtomicReference<IOException>(null);
-
-    // Maintain the list of log segments per stream
-    protected final PerStreamLogSegmentCache logSegmentCache;
-
-    // trace
-    protected final long metadataLatencyWarnThresholdMillis;
-
-    // Stats
-    private final OpStatsLogger getInprogressSegmentStat;
-    private final OpStatsLogger getCompletedSegmentStat;
-    private final OpStatsLogger negativeGetInprogressSegmentStat;
-    private final OpStatsLogger negativeGetCompletedSegmentStat;
-    private final OpStatsLogger recoverLastEntryStats;
-    private final OpStatsLogger recoverScannedEntriesStats;
-
-    /**
-     * Construct a Bookkeeper journal manager.
-     */
-    BKLogHandler(LogMetadata metadata,
-                 DistributedLogConfiguration conf,
-                 LogStreamMetadataStore streamMetadataStore,
-                 LogSegmentMetadataCache metadataCache,
-                 LogSegmentEntryStore entryStore,
-                 OrderedScheduler scheduler,
-                 StatsLogger statsLogger,
-                 AlertStatsLogger alertStatsLogger,
-                 String lockClientId) {
-        this.logMetadata = metadata;
-        this.conf = conf;
-        this.scheduler = scheduler;
-        this.statsLogger = statsLogger;
-        this.alertStatsLogger = alertStatsLogger;
-        this.logSegmentCache = new PerStreamLogSegmentCache(
-                metadata.getLogName(),
-                conf.isLogSegmentSequenceNumberValidationEnabled());
-        firstNumEntriesPerReadLastRecordScan = 
conf.getFirstNumEntriesPerReadLastRecordScan();
-        maxNumEntriesPerReadLastRecordScan = 
conf.getMaxNumEntriesPerReadLastRecordScan();
-        this.streamMetadataStore = streamMetadataStore;
-        this.metadataStore = streamMetadataStore.getLogSegmentMetadataStore();
-        this.metadataCache = metadataCache;
-        this.entryStore = entryStore;
-        this.lockClientId = lockClientId;
-
-        // Traces
-        this.metadataLatencyWarnThresholdMillis = 
conf.getMetadataLatencyWarnThresholdMillis();
-
-        // Stats
-        StatsLogger segmentsLogger = statsLogger.scope("logsegments");
-        getInprogressSegmentStat = 
segmentsLogger.getOpStatsLogger("get_inprogress_segment");
-        getCompletedSegmentStat = 
segmentsLogger.getOpStatsLogger("get_completed_segment");
-        negativeGetInprogressSegmentStat = 
segmentsLogger.getOpStatsLogger("negative_get_inprogress_segment");
-        negativeGetCompletedSegmentStat = 
segmentsLogger.getOpStatsLogger("negative_get_completed_segment");
-        recoverLastEntryStats = 
segmentsLogger.getOpStatsLogger("recover_last_entry");
-        recoverScannedEntriesStats = 
segmentsLogger.getOpStatsLogger("recover_scanned_entries");
-    }
-
-    BKLogHandler checkMetadataException() throws IOException {
-        if (null != metadataException.get()) {
-            throw metadataException.get();
-        }
-        return this;
-    }
-
-    public void reportGetSegmentStats(boolean enabled) {
-        this.reportGetSegmentStats = enabled;
-    }
-
-    public String getLockClientId() {
-        return lockClientId;
-    }
-
-    public Future<LogRecordWithDLSN> asyncGetFirstLogRecord() {
-        final Promise<LogRecordWithDLSN> promise = new 
Promise<LogRecordWithDLSN>();
-        streamMetadataStore.logExists(logMetadata.getUri(), 
logMetadata.getLogName())
-                .addEventListener(new FutureEventListener<Void>() {
-            @Override
-            public void onSuccess(Void value) {
-                readLogSegmentsFromStore(
-                        LogSegmentMetadata.COMPARATOR,
-                        LogSegmentFilter.DEFAULT_FILTER,
-                        null
-                ).addEventListener(new 
FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
-
-                    @Override
-                    public void onSuccess(Versioned<List<LogSegmentMetadata>> 
ledgerList) {
-                        if (ledgerList.getValue().isEmpty()) {
-                            promise.setException(new LogEmptyException("Log " 
+ getFullyQualifiedName() + " has no records"));
-                            return;
-                        }
-                        Future<LogRecordWithDLSN> firstRecord = null;
-                        for (LogSegmentMetadata ledger : 
ledgerList.getValue()) {
-                            if (!ledger.isTruncated() && 
(ledger.getRecordCount() > 0 || ledger.isInProgress())) {
-                                firstRecord = asyncReadFirstUserRecord(ledger, 
DLSN.InitialDLSN);
-                                break;
-                            }
-                        }
-                        if (null != firstRecord) {
-                            promise.become(firstRecord);
-                        } else {
-                            promise.setException(new LogEmptyException("Log " 
+ getFullyQualifiedName() + " has no records"));
-                        }
-                    }
-
-                    @Override
-                    public void onFailure(Throwable cause) {
-                        promise.setException(cause);
-                    }
-                });
-            }
-
-            @Override
-            public void onFailure(Throwable cause) {
-                promise.setException(cause);
-            }
-        });
-        return promise;
-    }
-
-    public Future<LogRecordWithDLSN> getLastLogRecordAsync(final boolean 
recover, final boolean includeEndOfStream) {
-        final Promise<LogRecordWithDLSN> promise = new 
Promise<LogRecordWithDLSN>();
-        streamMetadataStore.logExists(logMetadata.getUri(), 
logMetadata.getLogName())
-                .addEventListener(new FutureEventListener<Void>() {
-            @Override
-            public void onSuccess(Void value) {
-                readLogSegmentsFromStore(
-                        LogSegmentMetadata.DESC_COMPARATOR,
-                        LogSegmentFilter.DEFAULT_FILTER,
-                        null
-                ).addEventListener(new 
FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
-
-                    @Override
-                    public void onSuccess(Versioned<List<LogSegmentMetadata>> 
ledgerList) {
-                        if (ledgerList.getValue().isEmpty()) {
-                            promise.setException(
-                                    new LogEmptyException("Log " + 
getFullyQualifiedName() + " has no records"));
-                            return;
-                        }
-                        asyncGetLastLogRecord(
-                                ledgerList.getValue().iterator(),
-                                promise,
-                                recover,
-                                false,
-                                includeEndOfStream);
-                    }
-
-                    @Override
-                    public void onFailure(Throwable cause) {
-                        promise.setException(cause);
-                    }
-                });
-            }
-
-            @Override
-            public void onFailure(Throwable cause) {
-                promise.setException(cause);
-            }
-        });
-        return promise;
-    }
-
-    private void asyncGetLastLogRecord(final Iterator<LogSegmentMetadata> 
ledgerIter,
-                                       final Promise<LogRecordWithDLSN> 
promise,
-                                       final boolean fence,
-                                       final boolean includeControlRecord,
-                                       final boolean includeEndOfStream) {
-        if (ledgerIter.hasNext()) {
-            LogSegmentMetadata metadata = ledgerIter.next();
-            asyncReadLastRecord(metadata, fence, includeControlRecord, 
includeEndOfStream).addEventListener(
-                    new FutureEventListener<LogRecordWithDLSN>() {
-                        @Override
-                        public void onSuccess(LogRecordWithDLSN record) {
-                            if (null == record) {
-                                asyncGetLastLogRecord(ledgerIter, promise, 
fence, includeControlRecord, includeEndOfStream);
-                            } else {
-                                promise.setValue(record);
-                            }
-                        }
-
-                        @Override
-                        public void onFailure(Throwable cause) {
-                            promise.setException(cause);
-                        }
-                    }
-            );
-        } else {
-            promise.setException(new LogEmptyException("Log " + 
getFullyQualifiedName() + " has no records"));
-        }
-    }
-
-    private Future<LogRecordWithDLSN> 
asyncReadFirstUserRecord(LogSegmentMetadata ledger, DLSN beginDLSN) {
-        return ReadUtils.asyncReadFirstUserRecord(
-                getFullyQualifiedName(),
-                ledger,
-                firstNumEntriesPerReadLastRecordScan,
-                maxNumEntriesPerReadLastRecordScan,
-                new AtomicInteger(0),
-                scheduler,
-                entryStore,
-                beginDLSN
-        );
-    }
-
-    /**
-     * This is a helper method to compactly return the record count between 
two records, the first denoted by
-     * beginDLSN and the second denoted by endPosition. Its up to the caller 
to ensure that endPosition refers to
-     * position in the same ledger as beginDLSN.
-     */
-    private Future<Long> asyncGetLogRecordCount(LogSegmentMetadata ledger, 
final DLSN beginDLSN, final long endPosition) {
-        return asyncReadFirstUserRecord(ledger, beginDLSN).map(new 
Function<LogRecordWithDLSN, Long>() {
-            public Long apply(final LogRecordWithDLSN beginRecord) {
-                long recordCount = 0;
-                if (null != beginRecord) {
-                    recordCount = endPosition + 1 - 
beginRecord.getLastPositionWithinLogSegment();
-                }
-                return recordCount;
-            }
-        });
-    }
-
-    /**
-     * Ledger metadata tells us how many records are in each completed 
segment, but for the first and last segments
-     * we may have to crack open the entry and count. For the first entry, we 
need to do so because beginDLSN may be
-     * an interior entry. For the last entry, if it is inprogress, we need to 
recover it and find the last user
-     * entry.
-     */
-    private Future<Long> asyncGetLogRecordCount(final LogSegmentMetadata 
ledger, final DLSN beginDLSN) {
-        if (ledger.isInProgress() && ledger.isDLSNinThisSegment(beginDLSN)) {
-            return asyncReadLastUserRecord(ledger).flatMap(new 
Function<LogRecordWithDLSN, Future<Long>>() {
-                public Future<Long> apply(final LogRecordWithDLSN endRecord) {
-                    if (null != endRecord) {
-                        return asyncGetLogRecordCount(ledger, beginDLSN, 
endRecord.getLastPositionWithinLogSegment() /* end position */);
-                    } else {
-                        return Future.value((long) 0);
-                    }
-                }
-            });
-        } else if (ledger.isInProgress()) {
-            return asyncReadLastUserRecord(ledger).map(new 
Function<LogRecordWithDLSN, Long>() {
-                public Long apply(final LogRecordWithDLSN endRecord) {
-                    if (null != endRecord) {
-                        return (long) 
endRecord.getLastPositionWithinLogSegment();
-                    } else {
-                        return (long) 0;
-                    }
-                }
-            });
-        } else if (ledger.isDLSNinThisSegment(beginDLSN)) {
-            return asyncGetLogRecordCount(ledger, beginDLSN, 
ledger.getRecordCount() /* end position */);
-        } else {
-            return Future.value((long) ledger.getRecordCount());
-        }
-    }
-
-    /**
-     * Get a count of records between beginDLSN and the end of the stream.
-     *
-     * @param beginDLSN dlsn marking the start of the range
-     * @return the count of records present in the range
-     */
-    public Future<Long> asyncGetLogRecordCount(final DLSN beginDLSN) {
-        return streamMetadataStore.logExists(logMetadata.getUri(), 
logMetadata.getLogName())
-                .flatMap(new Function<Void, Future<Long>>() {
-            public Future<Long> apply(Void done) {
-
-                return readLogSegmentsFromStore(
-                        LogSegmentMetadata.COMPARATOR,
-                        LogSegmentFilter.DEFAULT_FILTER,
-                        null
-                ).flatMap(new Function<Versioned<List<LogSegmentMetadata>>, 
Future<Long>>() {
-                    public Future<Long> 
apply(Versioned<List<LogSegmentMetadata>> ledgerList) {
-
-                        List<Future<Long>> futureCounts = new 
ArrayList<Future<Long>>(ledgerList.getValue().size());
-                        for (LogSegmentMetadata ledger : 
ledgerList.getValue()) {
-                            if (ledger.getLogSegmentSequenceNumber() >= 
beginDLSN.getLogSegmentSequenceNo()) {
-                                
futureCounts.add(asyncGetLogRecordCount(ledger, beginDLSN));
-                            }
-                        }
-                        return Future.collect(futureCounts).map(new 
Function<List<Long>, Long>() {
-                            public Long apply(List<Long> counts) {
-                                return sum(counts);
-                            }
-                        });
-                    }
-                });
-            }
-        });
-    }
-
-    private Long sum(List<Long> values) {
-        long sum = 0;
-        for (Long value : values) {
-            sum += value;
-        }
-        return sum;
-    }
-
-    @Override
-    public Future<Void> asyncAbort() {
-        return asyncClose();
-    }
-
-    public Future<LogRecordWithDLSN> asyncReadLastUserRecord(final 
LogSegmentMetadata l) {
-        return asyncReadLastRecord(l, false, false, false);
-    }
-
-    public Future<LogRecordWithDLSN> asyncReadLastRecord(final 
LogSegmentMetadata l,
-                                                         final boolean fence,
-                                                         final boolean 
includeControl,
-                                                         final boolean 
includeEndOfStream) {
-        final AtomicInteger numRecordsScanned = new AtomicInteger(0);
-        final Stopwatch stopwatch = Stopwatch.createStarted();
-        return ReadUtils.asyncReadLastRecord(
-                getFullyQualifiedName(),
-                l,
-                fence,
-                includeControl,
-                includeEndOfStream,
-                firstNumEntriesPerReadLastRecordScan,
-                maxNumEntriesPerReadLastRecordScan,
-                numRecordsScanned,
-                scheduler,
-                entryStore
-        ).addEventListener(new FutureEventListener<LogRecordWithDLSN>() {
-            @Override
-            public void onSuccess(LogRecordWithDLSN value) {
-                
recoverLastEntryStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
-                
recoverScannedEntriesStats.registerSuccessfulEvent(numRecordsScanned.get());
-            }
-
-            @Override
-            public void onFailure(Throwable cause) {
-                
recoverLastEntryStats.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
-            }
-        });
-    }
-
-    protected void setLastLedgerRollingTimeMillis(long rollingTimeMillis) {
-        if (lastLedgerRollingTimeMillis < rollingTimeMillis) {
-            lastLedgerRollingTimeMillis = rollingTimeMillis;
-        }
-    }
-
-    public String getFullyQualifiedName() {
-        return logMetadata.getFullyQualifiedName();
-    }
-
-    // Log Segments Related Functions
-    //
-    // ***Note***
-    // Get log segment list should go through #getCachedLogSegments as we need 
to assign start sequence id
-    // for inprogress log segment so the reader could generate the right 
sequence id.
-    //
-    // ***PerStreamCache vs LogSegmentMetadataCache **
-    // The per stream cache maintains the list of segments per stream, while 
the metadata cache
-    // maintains log segments. The metadata cache is just to reduce the access 
to zookeeper, it is
-    // okay that some of the log segments are not in the cache; however the 
per stream cache can not
-    // have any gaps between log segment sequence numbers which it has to be 
accurate.
-
-    /**
-     * Get the cached log segments.
-     *
-     * @param comparator the comparator to sort the returned log segments.
-     * @return list of sorted log segments
-     * @throws UnexpectedException if unexpected condition detected.
-     */
-    protected List<LogSegmentMetadata> 
getCachedLogSegments(Comparator<LogSegmentMetadata> comparator)
-        throws UnexpectedException {
-        try {
-            return logSegmentCache.getLogSegments(comparator);
-        } catch (UnexpectedException ue) {
-            // the log segments cache went wrong
-            LOG.error("Unexpected exception on getting log segments from the 
cache for stream {}",
-                    getFullyQualifiedName(), ue);
-            metadataException.compareAndSet(null, ue);
-            throw ue;
-        }
-    }
-
-    /**
-     * Add the segment <i>metadata</i> for <i>name</i> in the cache.
-     *
-     * @param name
-     *          segment znode name.
-     * @param metadata
-     *          segment metadata.
-     */
-    protected void addLogSegmentToCache(String name, LogSegmentMetadata 
metadata) {
-        metadataCache.put(metadata.getZkPath(), metadata);
-        logSegmentCache.add(name, metadata);
-        // update the last ledger rolling time
-        if (!metadata.isInProgress() && (lastLedgerRollingTimeMillis < 
metadata.getCompletionTime())) {
-            lastLedgerRollingTimeMillis = metadata.getCompletionTime();
-        }
-
-        if (reportGetSegmentStats) {
-            // update stats
-            long ts = System.currentTimeMillis();
-            if (metadata.isInProgress()) {
-                // as we used timestamp as start tx id we could take it as 
start time
-                // NOTE: it is a hack here.
-                long elapsedMillis = ts - metadata.getFirstTxId();
-                long elapsedMicroSec = 
TimeUnit.MILLISECONDS.toMicros(elapsedMillis);
-                if (elapsedMicroSec > 0) {
-                    if (elapsedMillis > metadataLatencyWarnThresholdMillis) {
-                        LOG.warn("{} received inprogress log segment in {} 
millis: {}",
-                                 new Object[] { getFullyQualifiedName(), 
elapsedMillis, metadata });
-                    }
-                    
getInprogressSegmentStat.registerSuccessfulEvent(elapsedMicroSec);
-                } else {
-                    
negativeGetInprogressSegmentStat.registerSuccessfulEvent(-elapsedMicroSec);
-                }
-            } else {
-                long elapsedMillis = ts - metadata.getCompletionTime();
-                long elapsedMicroSec = 
TimeUnit.MILLISECONDS.toMicros(elapsedMillis);
-                if (elapsedMicroSec > 0) {
-                    if (elapsedMillis > metadataLatencyWarnThresholdMillis) {
-                        LOG.warn("{} received completed log segment in {} 
millis : {}",
-                                 new Object[] { getFullyQualifiedName(), 
elapsedMillis, metadata });
-                    }
-                    
getCompletedSegmentStat.registerSuccessfulEvent(elapsedMicroSec);
-                } else {
-                    
negativeGetCompletedSegmentStat.registerSuccessfulEvent(-elapsedMicroSec);
-                }
-            }
-        }
-    }
-
-    /**
-     * Read log segment <i>name</i> from the cache.
-     *
-     * @param name name of the log segment
-     * @return log segment metadata
-     */
-    protected LogSegmentMetadata readLogSegmentFromCache(String name) {
-        return logSegmentCache.get(name);
-    }
-
-    /**
-     * Remove the log segment <i>name</i> from the cache.
-     *
-     * @param name name of the log segment.
-     * @return log segment metadata
-     */
-    protected LogSegmentMetadata removeLogSegmentFromCache(String name) {
-        metadataCache.invalidate(name);
-        return logSegmentCache.remove(name);
-    }
-
-    /**
-     * Update the log segment cache with updated mapping
-     *
-     * @param logSegmentsRemoved log segments removed
-     * @param logSegmentsAdded log segments added
-     */
-    protected void updateLogSegmentCache(Set<String> logSegmentsRemoved,
-                                         Map<String, LogSegmentMetadata> 
logSegmentsAdded) {
-        for (String segmentName : logSegmentsRemoved) {
-            metadataCache.invalidate(segmentName);
-        }
-        for (Map.Entry<String, LogSegmentMetadata> entry : 
logSegmentsAdded.entrySet()) {
-            metadataCache.put(entry.getKey(), entry.getValue());
-        }
-        logSegmentCache.update(logSegmentsRemoved, logSegmentsAdded);
-    }
-
-    /**
-     * Read the log segments from the store and register a listener
-     * @param comparator
-     * @param segmentFilter
-     * @param logSegmentNamesListener
-     * @return future represents the result of log segments
-     */
-    public Future<Versioned<List<LogSegmentMetadata>>> 
readLogSegmentsFromStore(
-            final Comparator<LogSegmentMetadata> comparator,
-            final LogSegmentFilter segmentFilter,
-            final LogSegmentNamesListener logSegmentNamesListener) {
-        final Promise<Versioned<List<LogSegmentMetadata>>> readResult =
-                new Promise<Versioned<List<LogSegmentMetadata>>>();
-        metadataStore.getLogSegmentNames(logMetadata.getLogSegmentsPath(), 
logSegmentNamesListener)
-                .addEventListener(new 
FutureEventListener<Versioned<List<String>>>() {
-                    @Override
-                    public void onFailure(Throwable cause) {
-                        FutureUtils.setException(readResult, cause);
-                    }
-
-                    @Override
-                    public void onSuccess(Versioned<List<String>> 
logSegmentNames) {
-                        readLogSegmentsFromStore(logSegmentNames, comparator, 
segmentFilter, readResult);
-                    }
-                });
-        return readResult;
-    }
-
-    protected void readLogSegmentsFromStore(final Versioned<List<String>> 
logSegmentNames,
-                                            final 
Comparator<LogSegmentMetadata> comparator,
-                                            final LogSegmentFilter 
segmentFilter,
-                                            final 
Promise<Versioned<List<LogSegmentMetadata>>> readResult) {
-        Set<String> segmentsReceived = new HashSet<String>();
-        
segmentsReceived.addAll(segmentFilter.filter(logSegmentNames.getValue()));
-        Set<String> segmentsAdded;
-        final Set<String> removedSegments = Collections.synchronizedSet(new 
HashSet<String>());
-        final Map<String, LogSegmentMetadata> addedSegments =
-                Collections.synchronizedMap(new HashMap<String, 
LogSegmentMetadata>());
-        Pair<Set<String>, Set<String>> segmentChanges = 
logSegmentCache.diff(segmentsReceived);
-        segmentsAdded = segmentChanges.getLeft();
-        removedSegments.addAll(segmentChanges.getRight());
-
-        if (segmentsAdded.isEmpty()) {
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("No segments added for {}.", 
getFullyQualifiedName());
-            }
-
-            // update the cache before #getCachedLogSegments to return
-            updateLogSegmentCache(removedSegments, addedSegments);
-
-            List<LogSegmentMetadata> segmentList;
-            try {
-                segmentList = getCachedLogSegments(comparator);
-            } catch (UnexpectedException e) {
-                FutureUtils.setException(readResult, e);
-                return;
-            }
-
-            FutureUtils.setValue(readResult,
-                    new Versioned<List<LogSegmentMetadata>>(segmentList, 
logSegmentNames.getVersion()));
-            return;
-        }
-
-        final AtomicInteger numChildren = new 
AtomicInteger(segmentsAdded.size());
-        final AtomicInteger numFailures = new AtomicInteger(0);
-        for (final String segment: segmentsAdded) {
-            String logSegmentPath = logMetadata.getLogSegmentPath(segment);
-            LogSegmentMetadata cachedSegment = 
metadataCache.get(logSegmentPath);
-            if (null != cachedSegment) {
-                addedSegments.put(segment, cachedSegment);
-                completeReadLogSegmentsFromStore(
-                        removedSegments,
-                        addedSegments,
-                        comparator,
-                        readResult,
-                        logSegmentNames.getVersion(),
-                        numChildren,
-                        numFailures);
-                continue;
-            }
-            metadataStore.getLogSegment(logSegmentPath)
-                    .addEventListener(new 
FutureEventListener<LogSegmentMetadata>() {
-
-                        @Override
-                        public void onSuccess(LogSegmentMetadata result) {
-                            addedSegments.put(segment, result);
-                            complete();
-                        }
-
-                        @Override
-                        public void onFailure(Throwable cause) {
-                            // LogSegmentNotFoundException exception is 
possible in two cases
-                            // 1. A log segment was deleted by truncation 
between the call to getChildren and read
-                            // attempt on the znode corresponding to the 
segment
-                            // 2. In progress segment has been completed => 
inprogress ZNode does not exist
-                            if (cause instanceof LogSegmentNotFoundException) {
-                                removedSegments.add(segment);
-                                complete();
-                            } else {
-                                // fail fast
-                                if (1 == numFailures.incrementAndGet()) {
-                                    FutureUtils.setException(readResult, 
cause);
-                                    return;
-                                }
-                            }
-                        }
-
-                        private void complete() {
-                            completeReadLogSegmentsFromStore(
-                                    removedSegments,
-                                    addedSegments,
-                                    comparator,
-                                    readResult,
-                                    logSegmentNames.getVersion(),
-                                    numChildren,
-                                    numFailures);
-                        }
-                    });
-        }
-    }
-
-    private void completeReadLogSegmentsFromStore(final Set<String> 
removedSegments,
-                                                  final Map<String, 
LogSegmentMetadata> addedSegments,
-                                                  final 
Comparator<LogSegmentMetadata> comparator,
-                                                  final 
Promise<Versioned<List<LogSegmentMetadata>>> readResult,
-                                                  final Version 
logSegmentNamesVersion,
-                                                  final AtomicInteger 
numChildren,
-                                                  final AtomicInteger 
numFailures) {
-        if (0 != numChildren.decrementAndGet()) {
-            return;
-        }
-        if (numFailures.get() > 0) {
-            return;
-        }
-        // update the cache only when fetch completed and before 
#getCachedLogSegments
-        updateLogSegmentCache(removedSegments, addedSegments);
-        List<LogSegmentMetadata> segmentList;
-        try {
-            segmentList = getCachedLogSegments(comparator);
-        } catch (UnexpectedException e) {
-            FutureUtils.setException(readResult, e);
-            return;
-        }
-        FutureUtils.setValue(readResult,
-            new Versioned<List<LogSegmentMetadata>>(segmentList, 
logSegmentNamesVersion));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
deleted file mode 100644
index 8aa00e7..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
+++ /dev/null
@@ -1,431 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.twitter.distributedlog.callback.LogSegmentListener;
-import com.twitter.distributedlog.callback.LogSegmentNamesListener;
-import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
-import com.twitter.distributedlog.exceptions.DLIllegalStateException;
-import com.twitter.distributedlog.exceptions.LockingException;
-import com.twitter.distributedlog.exceptions.LogNotFoundException;
-import com.twitter.distributedlog.exceptions.LogSegmentNotFoundException;
-import com.twitter.distributedlog.exceptions.UnexpectedException;
-import com.twitter.distributedlog.logsegment.LogSegmentEntryStore;
-import com.twitter.distributedlog.metadata.LogMetadataForReader;
-import com.twitter.distributedlog.lock.DistributedLock;
-import com.twitter.distributedlog.logsegment.LogSegmentFilter;
-import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache;
-import com.twitter.distributedlog.metadata.LogStreamMetadataStore;
-import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.distributedlog.util.OrderedScheduler;
-import com.twitter.distributedlog.util.Utils;
-import com.twitter.util.ExceptionalFunction;
-import com.twitter.util.Function;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import com.twitter.util.Return;
-import com.twitter.util.Throw;
-import com.twitter.util.Try;
-import org.apache.bookkeeper.stats.AlertStatsLogger;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.util.SafeRunnable;
-import org.apache.bookkeeper.versioning.Version;
-import org.apache.bookkeeper.versioning.Versioned;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
-
-import javax.annotation.Nullable;
-
-/**
- * Log Handler for Readers.
- * <h3>Metrics</h3>
- *
- * <h4>ReadAhead Worker</h4>
- * Most of readahead stats are exposed under scope `readahead_worker`. Only 
readahead exceptions are exposed
- * in parent scope via <code>readAheadExceptionsLogger</code>.
- * <ul>
- * <li> `readahead_worker`/wait: counter. number of waits that readahead 
worker is waiting. If this keeps increasing,
- * it usually means readahead keep getting full because of reader slows down 
reading.
- * <li> `readahead_worker`/repositions: counter. number of repositions that 
readhead worker encounters. reposition
- * means that a readahead worker finds that it isn't advancing to a new log 
segment and force re-positioning.
- * <li> `readahead_worker`/entry_piggy_back_hits: counter. it increases when 
the last add confirmed being advanced
- * because of the piggy-back lac.
- * <li> `readahead_worker`/entry_piggy_back_misses: counter. it increases when 
the last add confirmed isn't advanced
- * by a read entry because it doesn't piggy back a newer lac.
- * <li> `readahead_worker`/read_entries: opstats. stats on number of entries 
read per readahead read batch.
- * <li> `readahead_worker`/read_lac_counter: counter. stats on the number of 
readLastConfirmed operations
- * <li> `readahead_worker`/read_lac_and_entry_counter: counter. stats on the 
number of readLastConfirmedAndEntry
- * operations.
- * <li> `readahead_worker`/cache_full: counter. it increases each time 
readahead worker finds cache become full.
- * If it keeps increasing, that means reader slows down reading.
- * <li> `readahead_worker`/resume: opstats. stats on readahead worker resuming 
reading from wait state.
- * <li> `readahead_worker`/read_lac_lag: opstats. stats on the number of 
entries diff between the lac reader knew
- * last time and the lac that it received. if `lag` between two subsequent 
lacs is high, that might means delay
- * might be high. because reader is only allowed to read entries after lac is 
advanced.
- * <li> `readahead_worker`/long_poll_interruption: opstats. stats on the 
number of interruptions happened to long
- * poll. the interruptions are usually because of receiving zookeeper 
notifications.
- * <li> `readahead_worker`/notification_execution: opstats. stats on 
executions over the notifications received from
- * zookeeper.
- * <li> `readahead_worker`/metadata_reinitialization: opstats. stats on 
metadata reinitialization after receiving
- * notifcation from log segments updates.
- * <li> `readahead_worker`/idle_reader_warn: counter. it increases each time 
the readahead worker detects itself
- * becoming idle.
- * </ul>
- * <h4>Read Lock</h4>
- * All read lock related stats are exposed under scope `read_lock`.
- * for detail stats.
- */
-class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener 
{
-    static final Logger LOG = LoggerFactory.getLogger(BKLogReadHandler.class);
-
-    protected final LogMetadataForReader logMetadataForReader;
-
-    protected final DynamicDistributedLogConfiguration dynConf;
-
-    private final Optional<String> subscriberId;
-    private DistributedLock readLock;
-    private Future<Void> lockAcquireFuture;
-
-    // notify the state change about the read handler
-    protected final AsyncNotification readerStateNotification;
-
-    // log segments listener
-    protected boolean logSegmentsNotificationDisabled = false;
-    protected final CopyOnWriteArraySet<LogSegmentListener> listeners =
-            new CopyOnWriteArraySet<LogSegmentListener>();
-    protected Versioned<List<LogSegmentMetadata>> lastNotifiedLogSegments =
-            new Versioned<List<LogSegmentMetadata>>(null, Version.NEW);
-
-    // stats
-    private final StatsLogger perLogStatsLogger;
-
-    /**
-     * Construct a Bookkeeper journal manager.
-     */
-    BKLogReadHandler(LogMetadataForReader logMetadata,
-                     Optional<String> subscriberId,
-                     DistributedLogConfiguration conf,
-                     DynamicDistributedLogConfiguration dynConf,
-                     LogStreamMetadataStore streamMetadataStore,
-                     LogSegmentMetadataCache metadataCache,
-                     LogSegmentEntryStore entryStore,
-                     OrderedScheduler scheduler,
-                     AlertStatsLogger alertStatsLogger,
-                     StatsLogger statsLogger,
-                     StatsLogger perLogStatsLogger,
-                     String clientId,
-                     AsyncNotification readerStateNotification,
-                     boolean isHandleForReading) {
-        super(logMetadata,
-                conf,
-                streamMetadataStore,
-                metadataCache,
-                entryStore,
-                scheduler,
-                statsLogger,
-                alertStatsLogger,
-                clientId);
-        this.logMetadataForReader = logMetadata;
-        this.dynConf = dynConf;
-        this.perLogStatsLogger =
-                isHandleForReading ? perLogStatsLogger : 
NullStatsLogger.INSTANCE;
-        this.readerStateNotification = readerStateNotification;
-        this.subscriberId = subscriberId;
-    }
-
-    @VisibleForTesting
-    String getReadLockPath() {
-        return logMetadataForReader.getReadLockPath(subscriberId);
-    }
-
-    <T> void satisfyPromiseAsync(final Promise<T> promise, final Try<T> 
result) {
-        scheduler.submit(new SafeRunnable() {
-            @Override
-            public void safeRun() {
-                promise.update(result);
-            }
-        });
-    }
-
-    Future<Void> checkLogStreamExists() {
-        return streamMetadataStore.logExists(logMetadata.getUri(), 
logMetadata.getLogName());
-    }
-
-    /**
-     * Elective stream lock--readers are not required to acquire the lock 
before using the stream.
-     */
-    synchronized Future<Void> lockStream() {
-        if (null == lockAcquireFuture) {
-            lockAcquireFuture = 
streamMetadataStore.createReadLock(logMetadataForReader, subscriberId)
-                    .flatMap(new ExceptionalFunction<DistributedLock, 
Future<Void>>() {
-                        @Override
-                        public Future<Void> applyE(DistributedLock lock) 
throws Throwable {
-                            BKLogReadHandler.this.readLock = lock;
-                            LOG.info("acquiring readlock {} at {}", 
getLockClientId(), getReadLockPath());
-                            return acquireLockOnExecutorThread(lock);
-                        }
-                    });
-        }
-        return lockAcquireFuture;
-    }
-
-    /**
-     * Begin asynchronous lock acquire, but ensure that the returned future is 
satisfied on an
-     * executor service thread.
-     */
-    Future<Void> acquireLockOnExecutorThread(DistributedLock lock) throws 
LockingException {
-        final Future<? extends DistributedLock> acquireFuture = 
lock.asyncAcquire();
-
-        // The future we return must be satisfied on an executor service 
thread. If we simply
-        // return the future returned by asyncAcquire, user callbacks may end 
up running in
-        // the lock state executor thread, which will cause deadlocks and 
introduce latency
-        // etc.
-        final Promise<Void> threadAcquirePromise = new Promise<Void>();
-        threadAcquirePromise.setInterruptHandler(new Function<Throwable, 
BoxedUnit>() {
-            @Override
-            public BoxedUnit apply(Throwable t) {
-                FutureUtils.cancel(acquireFuture);
-                return null;
-            }
-        });
-        acquireFuture.addEventListener(new 
FutureEventListener<DistributedLock>() {
-            @Override
-            public void onSuccess(DistributedLock lock) {
-                LOG.info("acquired readlock {} at {}", getLockClientId(), 
getReadLockPath());
-                satisfyPromiseAsync(threadAcquirePromise, new 
Return<Void>(null));
-            }
-
-            @Override
-            public void onFailure(Throwable cause) {
-                LOG.info("failed to acquire readlock {} at {}",
-                        new Object[]{ getLockClientId(), getReadLockPath(), 
cause });
-                satisfyPromiseAsync(threadAcquirePromise, new 
Throw<Void>(cause));
-            }
-        });
-        return threadAcquirePromise;
-    }
-
-    /**
-     * Check ownership of elective stream lock.
-     */
-    void checkReadLock() throws DLIllegalStateException, LockingException {
-        synchronized (this) {
-            if ((null == lockAcquireFuture) ||
-                (!lockAcquireFuture.isDefined())) {
-                throw new DLIllegalStateException("Attempt to check for lock 
before it has been acquired successfully");
-            }
-        }
-
-        readLock.checkOwnership();
-    }
-
-    public Future<Void> asyncClose() {
-        DistributedLock lockToClose;
-        synchronized (this) {
-            if (null != lockAcquireFuture && !lockAcquireFuture.isDefined()) {
-                FutureUtils.cancel(lockAcquireFuture);
-            }
-            lockToClose = readLock;
-        }
-        return Utils.closeSequence(scheduler, lockToClose)
-                .flatMap(new AbstractFunction1<Void, Future<Void>>() {
-            @Override
-            public Future<Void> apply(Void result) {
-                // unregister the log segment listener
-                
metadataStore.unregisterLogSegmentListener(logMetadata.getLogSegmentsPath(), 
BKLogReadHandler.this);
-                return Future.Void();
-            }
-        });
-    }
-
-    @Override
-    public Future<Void> asyncAbort() {
-        return asyncClose();
-    }
-
-    /**
-     * Start fetch the log segments and register the {@link 
LogSegmentNamesListener}.
-     * The future is satisfied only on a successful fetch or encountered a 
fatal failure.
-     *
-     * @return future represents the fetch result
-     */
-    Future<Versioned<List<LogSegmentMetadata>>> asyncStartFetchLogSegments() {
-        Promise<Versioned<List<LogSegmentMetadata>>> promise =
-                new Promise<Versioned<List<LogSegmentMetadata>>>();
-        asyncStartFetchLogSegments(promise);
-        return promise;
-    }
-
-    void asyncStartFetchLogSegments(final 
Promise<Versioned<List<LogSegmentMetadata>>> promise) {
-        readLogSegmentsFromStore(
-                LogSegmentMetadata.COMPARATOR,
-                LogSegmentFilter.DEFAULT_FILTER,
-                this).addEventListener(new 
FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
-            @Override
-            public void onFailure(Throwable cause) {
-                if (cause instanceof LogNotFoundException ||
-                        cause instanceof LogSegmentNotFoundException ||
-                        cause instanceof UnexpectedException) {
-                    // indicate some inconsistent behavior, abort
-                    metadataException.compareAndSet(null, (IOException) cause);
-                    // notify the reader that read handler is in error state
-                    notifyReaderOnError(cause);
-                    FutureUtils.setException(promise, cause);
-                    return;
-                }
-                scheduler.schedule(new Runnable() {
-                    @Override
-                    public void run() {
-                        asyncStartFetchLogSegments(promise);
-                    }
-                }, conf.getZKRetryBackoffMaxMillis(), TimeUnit.MILLISECONDS);
-            }
-
-            @Override
-            public void onSuccess(Versioned<List<LogSegmentMetadata>> 
segments) {
-                // no-op
-                FutureUtils.setValue(promise, segments);
-            }
-        });
-    }
-
-    @VisibleForTesting
-    void disableReadAheadLogSegmentsNotification() {
-        logSegmentsNotificationDisabled = true;
-    }
-
-    @Override
-    public void onSegmentsUpdated(final Versioned<List<String>> segments) {
-        synchronized (this) {
-            if (lastNotifiedLogSegments.getVersion() != Version.NEW &&
-                    
lastNotifiedLogSegments.getVersion().compare(segments.getVersion()) != 
Version.Occurred.BEFORE) {
-                // the log segments has been read, and it is possibly a retry 
from last segments update
-                return;
-            }
-        }
-
-        Promise<Versioned<List<LogSegmentMetadata>>> readLogSegmentsPromise =
-                new Promise<Versioned<List<LogSegmentMetadata>>>();
-        readLogSegmentsPromise.addEventListener(new 
FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
-            @Override
-            public void onFailure(Throwable cause) {
-                if (cause instanceof LogNotFoundException ||
-                        cause instanceof LogSegmentNotFoundException ||
-                        cause instanceof UnexpectedException) {
-                    // indicate some inconsistent behavior, abort
-                    metadataException.compareAndSet(null, (IOException) cause);
-                    // notify the reader that read handler is in error state
-                    notifyReaderOnError(cause);
-                    return;
-                }
-                scheduler.schedule(new Runnable() {
-                    @Override
-                    public void run() {
-                        onSegmentsUpdated(segments);
-                    }
-                }, conf.getZKRetryBackoffMaxMillis(), TimeUnit.MILLISECONDS);
-            }
-
-            @Override
-            public void onSuccess(Versioned<List<LogSegmentMetadata>> 
logSegments) {
-                List<LogSegmentMetadata> segmentsToNotify = null;
-                synchronized (BKLogReadHandler.this) {
-                    Versioned<List<LogSegmentMetadata>> lastLogSegments = 
lastNotifiedLogSegments;
-                    if (lastLogSegments.getVersion() == Version.NEW ||
-                            
lastLogSegments.getVersion().compare(logSegments.getVersion()) == 
Version.Occurred.BEFORE) {
-                        lastNotifiedLogSegments = logSegments;
-                        segmentsToNotify = logSegments.getValue();
-                    }
-                }
-                if (null != segmentsToNotify) {
-                    notifyUpdatedLogSegments(segmentsToNotify);
-                }
-            }
-        });
-        // log segments list is updated, read their metadata
-        readLogSegmentsFromStore(
-                segments,
-                LogSegmentMetadata.COMPARATOR,
-                LogSegmentFilter.DEFAULT_FILTER,
-                readLogSegmentsPromise);
-    }
-
-    @Override
-    public void onLogStreamDeleted() {
-        notifyLogStreamDeleted();
-    }
-
-    //
-    // Listener for log segments
-    //
-
-    protected void registerListener(@Nullable LogSegmentListener listener) {
-        if (null != listener) {
-            listeners.add(listener);
-        }
-    }
-
-    protected void unregisterListener(@Nullable LogSegmentListener listener) {
-        if (null != listener) {
-            listeners.remove(listener);
-        }
-    }
-
-    protected void notifyUpdatedLogSegments(List<LogSegmentMetadata> segments) 
{
-        if (logSegmentsNotificationDisabled) {
-            return;
-        }
-
-        for (LogSegmentListener listener : listeners) {
-            List<LogSegmentMetadata> listToReturn =
-                    new ArrayList<LogSegmentMetadata>(segments);
-            Collections.sort(listToReturn, LogSegmentMetadata.COMPARATOR);
-            listener.onSegmentsUpdated(listToReturn);
-        }
-    }
-
-    protected void notifyLogStreamDeleted() {
-        if (logSegmentsNotificationDisabled) {
-            return;
-        }
-
-        for (LogSegmentListener listener : listeners) {
-            listener.onLogStreamDeleted();
-        }
-    }
-
-    // notify the errors
-    protected void notifyReaderOnError(Throwable cause) {
-        if (null != readerStateNotification) {
-            readerStateNotification.notifyOnError(cause);
-        }
-    }
-}

Reply via email to