This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new e1d72cf4ec [BP-62] Refactor read op, and introduce batchReadOp. (#4190)
e1d72cf4ec is described below

commit e1d72cf4ece2ac15e88cd025aa347125207ed8cc
Author: Yan Zhao <[email protected]>
AuthorDate: Mon Feb 5 09:42:39 2024 +0800

    [BP-62] Refactor read op, and introduce batchReadOp. (#4190)
    
    ### Motivation
    This is the fourth PR for the batch 
read(https://github.com/apache/bookkeeper/pull/4051) feature.
    
    Refactor read op, extract ReadOpBase. Introduce batchedReadOp.
---
 .../apache/bookkeeper/client/BatchedReadOp.java    | 321 +++++++++++++
 .../client/ListenerBasedPendingReadOp.java         |   2 +-
 .../apache/bookkeeper/client/PendingReadOp.java    | 526 ++++++---------------
 .../org/apache/bookkeeper/client/ReadOpBase.java   | 293 ++++++++++++
 .../apache/bookkeeper/client/TestParallelRead.java |  17 +-
 5 files changed, 760 insertions(+), 399 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java
new file mode 100644
index 0000000000..4892882e1d
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java
@@ -0,0 +1,321 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import io.netty.buffer.ByteBuf;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
+import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.proto.BookieProtocol;
+import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.BatchedReadEntryCallback;
+import org.apache.bookkeeper.proto.checksum.DigestManager;
+import org.apache.bookkeeper.util.ByteBufList;
+import org.apache.bookkeeper.util.MathUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BatchedReadOp extends ReadOpBase implements 
BatchedReadEntryCallback {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BatchedReadOp.class);
+
+    final int maxCount;
+    final long maxSize;
+
+    BatchedLedgerEntryRequest request;
+
+    BatchedReadOp(LedgerHandle lh,
+                  ClientContext clientCtx,
+                  long startEntryId,
+                  int maxCount,
+                  long maxSize,
+                  boolean isRecoveryRead) {
+        super(lh, clientCtx, startEntryId, -1L, isRecoveryRead);
+        this.maxCount = maxCount;
+        this.maxSize = maxSize;
+    }
+
+    @Override
+    void initiate() {
+        this.requestTimeNanos = MathUtils.nowInNano();
+        List<BookieId> ensemble = 
getLedgerMetadata().getEnsembleAt(startEntryId);
+        request = new SequenceReadRequest(ensemble, lh.ledgerId, startEntryId, 
maxCount, maxSize);
+        request.read();
+        if (clientCtx.getConf().readSpeculativeRequestPolicy.isPresent()) {
+            speculativeTask = 
clientCtx.getConf().readSpeculativeRequestPolicy.get()
+                    .initiateSpeculativeRequest(clientCtx.getScheduler(), 
request);
+        }
+    }
+
+    @Override
+    protected void submitCallback(int code) {
+        // ensure callback once
+        if (!complete.compareAndSet(false, true)) {
+            return;
+        }
+
+        cancelSpeculativeTask(true);
+
+        long latencyNanos = MathUtils.elapsedNanos(requestTimeNanos);
+        if (code != BKException.Code.OK) {
+            LOG.error(
+                    "Read of ledger entry failed: L{} E{}-E{}, Sent to {}, "
+                            + "Heard from {} : bitset = {}, Error = '{}'. 
First unread entry is ({}, rc = {})",
+                    lh.getId(), startEntryId, endEntryId, sentToHosts, 
heardFromHosts, heardFromHostsBitSet,
+                    BKException.getMessage(code), startEntryId, code);
+            
clientCtx.getClientStats().getReadOpLogger().registerFailedEvent(latencyNanos, 
TimeUnit.NANOSECONDS);
+            // release the entries
+
+            request.close();
+            future.completeExceptionally(BKException.create(code));
+        } else {
+            
clientCtx.getClientStats().getReadOpLogger().registerSuccessfulEvent(latencyNanos,
 TimeUnit.NANOSECONDS);
+            future.complete(LedgerEntriesImpl.create(request.entries));
+        }
+    }
+
+    @Override
+    public void readEntriesComplete(int rc, long ledgerId, long startEntryId, 
ByteBufList bufList, Object ctx) {
+        final ReadContext rctx = (ReadContext) ctx;
+        final BatchedLedgerEntryRequest entry = (BatchedLedgerEntryRequest) 
rctx.entry;
+
+        if (rc != BKException.Code.OK) {
+            entry.logErrorAndReattemptRead(rctx.bookieIndex, rctx.to, "Error: 
" + BKException.getMessage(rc), rc);
+            return;
+        }
+
+        heardFromHosts.add(rctx.to);
+        heardFromHostsBitSet.set(rctx.bookieIndex, true);
+
+        bufList.retain();
+        // if entry has completed don't handle twice
+        if (entry.complete(rctx.bookieIndex, rctx.to, bufList)) {
+            if (!isRecoveryRead) {
+                // do not advance LastAddConfirmed for recovery reads
+                lh.updateLastConfirmed(rctx.getLastAddConfirmed(), 0L);
+            }
+            submitCallback(BKException.Code.OK);
+        } else {
+            bufList.release();
+        }
+    }
+
+    void sendReadTo(int bookieIndex, BookieId to, BatchedLedgerEntryRequest 
entry) throws InterruptedException {
+        if (lh.throttler != null) {
+            lh.throttler.acquire();
+        }
+        if (isRecoveryRead) {
+            int flags = BookieProtocol.FLAG_HIGH_PRIORITY | 
BookieProtocol.FLAG_DO_FENCING;
+            clientCtx.getBookieClient().batchReadEntries(to, lh.ledgerId, 
entry.eId,
+                    maxCount, maxSize, this, new ReadContext(bookieIndex, to, 
entry), flags, lh.ledgerKey);
+        } else {
+            clientCtx.getBookieClient().batchReadEntries(to, lh.ledgerId, 
entry.eId, maxCount, maxSize,
+                    this, new ReadContext(bookieIndex, to, entry), 
BookieProtocol.FLAG_NONE);
+        }
+    }
+
+    abstract class BatchedLedgerEntryRequest extends LedgerEntryRequest {
+
+        //Indicate which ledger the BatchedLedgerEntryRequest is reading.
+        final long lId;
+        final int maxCount;
+        final long maxSize;
+
+        final List<LedgerEntry> entries;
+
+        BatchedLedgerEntryRequest(List<BookieId> ensemble, long lId, long eId, 
int maxCount, long maxSize) {
+            super(ensemble, eId);
+            this.lId = lId;
+            this.maxCount = maxCount;
+            this.maxSize = maxSize;
+            this.entries = new ArrayList<>(maxCount);
+        }
+
+        boolean complete(int bookieIndex, BookieId host, final ByteBufList 
bufList) {
+            if (isComplete()) {
+                return false;
+            }
+            if (!complete.getAndSet(true)) {
+                for (int i = 0; i < bufList.size(); i++) {
+                    ByteBuf buffer = bufList.getBuffer(i);
+                    ByteBuf content;
+                    try {
+                        content = lh.macManager.verifyDigestAndReturnData(eId 
+ i, buffer);
+                    } catch (BKException.BKDigestMatchException e) {
+                        clientCtx.getClientStats().getReadOpDmCounter().inc();
+                        logErrorAndReattemptRead(bookieIndex, host, "Mac 
mismatch",
+                                BKException.Code.DigestMatchException);
+                        return false;
+                    }
+                    rc = BKException.Code.OK;
+                    /*
+                     * The length is a long and it is the last field of the 
metadata of an entry.
+                     * Consequently, we have to subtract 8 from 
METADATA_LENGTH to get the length.
+                     */
+                    LedgerEntryImpl entryImpl =  
LedgerEntryImpl.create(lh.ledgerId, startEntryId + i);
+                    
entryImpl.setLength(buffer.getLong(DigestManager.METADATA_LENGTH - 8));
+                    entryImpl.setEntryBuf(content);
+                    entries.add(entryImpl);
+                }
+                writeSet.recycle();
+                return true;
+            } else {
+                writeSet.recycle();
+                return false;
+            }
+        }
+
+        @Override
+        public String toString() {
+            return String.format("L%d-E%d~%d s-%d", lh.getId(), eId, eId + 
maxCount, maxSize);
+        }
+    }
+
+    class SequenceReadRequest extends BatchedLedgerEntryRequest {
+
+        static final int NOT_FOUND = -1;
+        int nextReplicaIndexToReadFrom = 0;
+        final BitSet sentReplicas;
+        final BitSet erroredReplicas;
+        SequenceReadRequest(List<BookieId> ensemble,
+                            long lId,
+                            long eId,
+                            int maxCount,
+                            long maxSize) {
+            super(ensemble, lId, eId, maxCount, maxSize);
+            this.sentReplicas = new 
BitSet(lh.getLedgerMetadata().getWriteQuorumSize());
+            this.erroredReplicas = new 
BitSet(lh.getLedgerMetadata().getWriteQuorumSize());
+        }
+
+        @Override
+        void read() {
+            sendNextRead();
+        }
+
+        private synchronized int getNextReplicaIndexToReadFrom() {
+            return nextReplicaIndexToReadFrom;
+        }
+
+        private BitSet getSentToBitSet() {
+            BitSet b = new BitSet(ensemble.size());
+
+            for (int i = 0; i < sentReplicas.length(); i++) {
+                if (sentReplicas.get(i)) {
+                    b.set(writeSet.get(i));
+                }
+            }
+            return b;
+        }
+
+        private boolean readsOutstanding() {
+            return (sentReplicas.cardinality() - 
erroredReplicas.cardinality()) > 0;
+        }
+
+        @Override
+        synchronized BookieId maybeSendSpeculativeRead(BitSet heardFrom) {
+            if (nextReplicaIndexToReadFrom >= 
getLedgerMetadata().getWriteQuorumSize()) {
+                return null;
+            }
+
+            BitSet sentTo = getSentToBitSet();
+            sentTo.and(heardFrom);
+
+            // only send another read if we have had no successful response at 
all
+            // (even for other entries) from any of the other bookies we have 
sent the
+            // request to
+            if (sentTo.cardinality() == 0) {
+                clientCtx.getClientStats().getSpeculativeReadCounter().inc();
+                return sendNextRead();
+            } else {
+                return null;
+            }
+        }
+
+        synchronized BookieId sendNextRead() {
+            if (nextReplicaIndexToReadFrom >= 
getLedgerMetadata().getWriteQuorumSize()) {
+                // we are done, the read has failed from all replicas, just 
fail the
+                // read
+                fail(firstError);
+                return null;
+            }
+
+            // ToDo: pick replica with writable PCBC. ISSUE #1239
+            // https://github.com/apache/bookkeeper/issues/1239
+            int replica = nextReplicaIndexToReadFrom;
+            int bookieIndex = writeSet.get(nextReplicaIndexToReadFrom);
+            nextReplicaIndexToReadFrom++;
+
+            try {
+                BookieId to = ensemble.get(bookieIndex);
+                sendReadTo(bookieIndex, to, this);
+                sentToHosts.add(to);
+                sentReplicas.set(replica);
+                return to;
+            } catch (InterruptedException ie) {
+                LOG.error("Interrupted reading entry " + this, ie);
+                Thread.currentThread().interrupt();
+                fail(BKException.Code.InterruptedException);
+                return null;
+            }
+        }
+
+        @Override
+        synchronized void logErrorAndReattemptRead(int bookieIndex, BookieId 
host, String errMsg, int rc) {
+            super.logErrorAndReattemptRead(bookieIndex, host, errMsg, rc);
+            int replica = writeSet.indexOf(bookieIndex);
+            if (replica == NOT_FOUND) {
+                LOG.error("Received error from a host which is not in the 
ensemble {} {}.", host, ensemble);
+                return;
+            }
+            erroredReplicas.set(replica);
+            if (isRecoveryRead && (numBookiesMissingEntry >= 
requiredBookiesMissingEntryForRecovery)) {
+                /* For recovery, report NoSuchEntry as soon as wQ-aQ+1 bookies 
report that they do not
+                 * have the entry */
+                fail(BKException.Code.NoSuchEntryException);
+                return;
+            }
+
+            if (!readsOutstanding()) {
+                sendNextRead();
+            }
+        }
+
+        @Override
+        boolean complete(int bookieIndex, BookieId host, final ByteBufList 
bufList) {
+            boolean completed = super.complete(bookieIndex, host, bufList);
+            if (completed) {
+                int numReplicasTried = getNextReplicaIndexToReadFrom();
+                // Check if any speculative reads were issued and mark any 
slow bookies before
+                // the first successful speculative read as "slow"
+                for (int i = 0; i < numReplicasTried - 1; i++) {
+                    int slowBookieIndex = writeSet.get(i);
+                    BookieId slowBookieSocketAddress = 
ensemble.get(slowBookieIndex);
+                    
clientCtx.getPlacementPolicy().registerSlowBookie(slowBookieSocketAddress, eId);
+                }
+            }
+            return completed;
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ListenerBasedPendingReadOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ListenerBasedPendingReadOp.java
index 6733b2e9ea..fedb79696a 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ListenerBasedPendingReadOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ListenerBasedPendingReadOp.java
@@ -45,7 +45,7 @@ class ListenerBasedPendingReadOp extends PendingReadOp {
 
     @Override
     protected void submitCallback(int code) {
-        LedgerEntryRequest request;
+        SingleLedgerEntryRequest request;
         while (!seq.isEmpty() && (request = seq.getFirst()) != null) {
             if (!request.isComplete()) {
                 return;
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index 73715859c0..15d48c6435 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -21,27 +21,16 @@
 package org.apache.bookkeeper.client;
 
 import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ListenableFuture;
 import io.netty.buffer.ByteBuf;
 import java.util.BitSet;
-import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
-import org.apache.bookkeeper.client.api.LedgerEntries;
-import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
 import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
 import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
-import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallbackCtx;
 import org.apache.bookkeeper.proto.checksum.DigestManager;
 import org.apache.bookkeeper.util.MathUtils;
 import org.slf4j.Logger;
@@ -54,85 +43,165 @@ import org.slf4j.LoggerFactory;
  * application as soon as it arrives rather than waiting for the whole thing.
  *
  */
-class PendingReadOp implements ReadEntryCallback, Runnable {
+class PendingReadOp extends ReadOpBase implements ReadEntryCallback  {
     private static final Logger LOG = 
LoggerFactory.getLogger(PendingReadOp.class);
 
-    private ScheduledFuture<?> speculativeTask = null;
-    protected final LinkedList<LedgerEntryRequest> seq;
-    private final CompletableFuture<LedgerEntries> future;
-    private final Set<BookieId> heardFromHosts;
-    private final BitSet heardFromHostsBitSet;
-    private final Set<BookieId> sentToHosts = new HashSet<BookieId>();
-    LedgerHandle lh;
-    final ClientContext clientCtx;
+    protected boolean parallelRead = false;
+    protected final LinkedList<SingleLedgerEntryRequest> seq;
 
-    long numPendingEntries;
-    final long startEntryId;
-    final long endEntryId;
-    long requestTimeNanos;
+    PendingReadOp(LedgerHandle lh,
+                  ClientContext clientCtx,
+                  long startEntryId,
+                  long endEntryId,
+                  boolean isRecoveryRead) {
+        super(lh, clientCtx, startEntryId, endEntryId, isRecoveryRead);
+        this.seq = new LinkedList<>();
+        numPendingEntries = endEntryId - startEntryId + 1;
+    }
+
+    PendingReadOp parallelRead(boolean enabled) {
+        this.parallelRead = enabled;
+        return this;
+    }
+
+    void initiate() {
+        long nextEnsembleChange = startEntryId, i = startEntryId;
+        this.requestTimeNanos = MathUtils.nowInNano();
+        List<BookieId> ensemble = null;
+        do {
+            if (i == nextEnsembleChange) {
+                ensemble = getLedgerMetadata().getEnsembleAt(i);
+                nextEnsembleChange = 
LedgerMetadataUtils.getNextEnsembleChange(getLedgerMetadata(), i);
+            }
+            SingleLedgerEntryRequest entry;
+            if (parallelRead) {
+                entry = new ParallelReadRequest(ensemble, lh.ledgerId, i);
+            } else {
+                entry = new SequenceReadRequest(ensemble, lh.ledgerId, i);
+            }
+            seq.add(entry);
+            i++;
+        } while (i <= endEntryId);
+        // read the entries.
+        for (LedgerEntryRequest entry : seq) {
+            entry.read();
+            if (!parallelRead && 
clientCtx.getConf().readSpeculativeRequestPolicy.isPresent()) {
+                speculativeTask = 
clientCtx.getConf().readSpeculativeRequestPolicy.get()
+                    .initiateSpeculativeRequest(clientCtx.getScheduler(), 
entry);
+            }
+        }
+    }
 
-    final int requiredBookiesMissingEntryForRecovery;
-    final boolean isRecoveryRead;
+    @Override
+    public void readEntryComplete(int rc, long ledgerId, final long entryId, 
final ByteBuf buffer, Object ctx) {
+        final ReadContext rctx = (ReadContext) ctx;
+        final SingleLedgerEntryRequest entry = (SingleLedgerEntryRequest) 
rctx.entry;
+
+        if (rc != BKException.Code.OK) {
+            entry.logErrorAndReattemptRead(rctx.bookieIndex, rctx.to, "Error: 
" + BKException.getMessage(rc), rc);
+            return;
+        }
+
+        heardFromHosts.add(rctx.to);
+        heardFromHostsBitSet.set(rctx.bookieIndex, true);
+
+        buffer.retain();
+        // if entry has completed don't handle twice
+        if (entry.complete(rctx.bookieIndex, rctx.to, buffer)) {
+            if (!isRecoveryRead) {
+                // do not advance LastAddConfirmed for recovery reads
+                lh.updateLastConfirmed(rctx.getLastAddConfirmed(), 0L);
+            }
+            submitCallback(BKException.Code.OK);
+        } else {
+            buffer.release();
+        }
+
+        if (numPendingEntries < 0) {
+            LOG.error("Read too many values for ledger {} : [{}, {}].",
+                    ledgerId, startEntryId, endEntryId);
+        }
+
+    }
+
+    protected void submitCallback(int code) {
+        if (BKException.Code.OK == code) {
+            numPendingEntries--;
+            if (numPendingEntries != 0) {
+                return;
+            }
+        }
+
+        // ensure callback once
+        if (!complete.compareAndSet(false, true)) {
+            return;
+        }
 
-    boolean parallelRead = false;
-    final AtomicBoolean complete = new AtomicBoolean(false);
-    boolean allowFailFast = false;
+        cancelSpeculativeTask(true);
 
-    abstract class LedgerEntryRequest implements SpeculativeRequestExecutor, 
AutoCloseable {
+        long latencyNanos = MathUtils.elapsedNanos(requestTimeNanos);
+        if (code != BKException.Code.OK) {
+            long firstUnread = LedgerHandle.INVALID_ENTRY_ID;
+            Integer firstRc = null;
+            for (LedgerEntryRequest req : seq) {
+                if (!req.isComplete()) {
+                    firstUnread = req.eId;
+                    firstRc = req.rc;
+                    break;
+                }
+            }
+            LOG.error(
+                    "Read of ledger entry failed: L{} E{}-E{}, Sent to {}, "
+                            + "Heard from {} : bitset = {}, Error = '{}'. 
First unread entry is ({}, rc = {})",
+                    lh.getId(), startEntryId, endEntryId, sentToHosts, 
heardFromHosts, heardFromHostsBitSet,
+                    BKException.getMessage(code), firstUnread, firstRc);
+            
clientCtx.getClientStats().getReadOpLogger().registerFailedEvent(latencyNanos, 
TimeUnit.NANOSECONDS);
+            // release the entries
+            seq.forEach(LedgerEntryRequest::close);
+            future.completeExceptionally(BKException.create(code));
+        } else {
+            
clientCtx.getClientStats().getReadOpLogger().registerSuccessfulEvent(latencyNanos,
 TimeUnit.NANOSECONDS);
+            future.complete(LedgerEntriesImpl.create(Lists.transform(seq, 
input -> input.entryImpl)));
+        }
+    }
 
-        final AtomicBoolean complete = new AtomicBoolean(false);
+    void sendReadTo(int bookieIndex, BookieId to, SingleLedgerEntryRequest 
entry) throws InterruptedException {
+        if (lh.throttler != null) {
+            lh.throttler.acquire();
+        }
 
-        int rc = BKException.Code.OK;
-        int firstError = BKException.Code.OK;
-        int numBookiesMissingEntry = 0;
+        if (isRecoveryRead) {
+            int flags = BookieProtocol.FLAG_HIGH_PRIORITY | 
BookieProtocol.FLAG_DO_FENCING;
+            clientCtx.getBookieClient().readEntry(to, lh.ledgerId, entry.eId,
+                    this, new ReadContext(bookieIndex, to, entry), flags, 
lh.ledgerKey);
+        } else {
+            clientCtx.getBookieClient().readEntry(to, lh.ledgerId, entry.eId,
+                    this, new ReadContext(bookieIndex, to, entry), 
BookieProtocol.FLAG_NONE);
+        }
+    }
 
-        final List<BookieId> ensemble;
-        final DistributionSchedule.WriteSet writeSet;
+    abstract class SingleLedgerEntryRequest extends LedgerEntryRequest {
         final LedgerEntryImpl entryImpl;
-        final long eId;
 
-        LedgerEntryRequest(List<BookieId> ensemble, long lId, long eId) {
+        SingleLedgerEntryRequest(List<BookieId> ensemble, long lId, long eId) {
+            super(ensemble, eId);
             this.entryImpl = LedgerEntryImpl.create(lId, eId);
-            this.ensemble = ensemble;
-            this.eId = eId;
-
-            if (clientCtx.getConf().enableReorderReadSequence) {
-                writeSet = clientCtx.getPlacementPolicy()
-                    .reorderReadSequence(
-                            ensemble,
-                            lh.getBookiesHealthInfo(),
-                            lh.getWriteSetForReadOperation(eId));
-            } else {
-                writeSet = lh.getWriteSetForReadOperation(eId);
-            }
         }
 
         @Override
         public void close() {
-            // this request has succeeded before, can't recycle writeSet again
-            if (complete.compareAndSet(false, true)) {
-                rc = BKException.Code.UnexpectedConditionException;
-                writeSet.recycle();
-            }
+            super.close();
             entryImpl.close();
         }
 
-        /**
-         * Execute the read request.
-         */
-        abstract void read();
-
         /**
          * Complete the read request from <i>host</i>.
          *
-         * @param bookieIndex
-         *          bookie index
-         * @param host
-         *          host that respond the read
-         * @param buffer
-         *          the data buffer
+         * @param bookieIndex bookie index
+         * @param host        host that respond the read
+         * @param buffer      the data buffer
          * @return return true if we managed to complete the entry;
-         *         otherwise return false if the read entry is not complete or 
it is already completed before
+         * otherwise return false if the read entry is not complete or it is 
already completed before
          */
         boolean complete(int bookieIndex, BookieId host, final ByteBuf buffer) 
{
             ByteBuf content;
@@ -141,7 +210,7 @@ class PendingReadOp implements ReadEntryCallback, Runnable {
             }
             try {
                 content = lh.macManager.verifyDigestAndReturnData(eId, buffer);
-            } catch (BKDigestMatchException e) {
+            } catch (BKException.BKDigestMatchException e) {
                 clientCtx.getClientStats().getReadOpDmCounter().inc();
                 logErrorAndReattemptRead(bookieIndex, host, "Mac mismatch", 
BKException.Code.DigestMatchException);
                 return false;
@@ -161,125 +230,9 @@ class PendingReadOp implements ReadEntryCallback, 
Runnable {
                 return false;
             }
         }
-
-        /**
-         * Fail the request with given result code <i>rc</i>.
-         *
-         * @param rc
-         *          result code to fail the request.
-         * @return true if we managed to fail the entry; otherwise return 
false if it already failed or completed.
-         */
-        boolean fail(int rc) {
-            if (complete.compareAndSet(false, true)) {
-                this.rc = rc;
-                submitCallback(rc);
-                return true;
-            } else {
-                return false;
-            }
-        }
-
-        /**
-         * Log error <i>errMsg</i> and reattempt read from <i>host</i>.
-         *
-         * @param bookieIndex
-         *          bookie index
-         * @param host
-         *          host that just respond
-         * @param errMsg
-         *          error msg to log
-         * @param rc
-         *          read result code
-         */
-        synchronized void logErrorAndReattemptRead(int bookieIndex, BookieId 
host, String errMsg, int rc) {
-            if (BKException.Code.OK == firstError
-                || BKException.Code.NoSuchEntryException == firstError
-                || BKException.Code.NoSuchLedgerExistsException == firstError) 
{
-                firstError = rc;
-            } else if (BKException.Code.BookieHandleNotAvailableException == 
firstError
-                       && BKException.Code.NoSuchEntryException != rc
-                       && BKException.Code.NoSuchLedgerExistsException != rc) {
-                // if other exception rather than NoSuchEntryException or 
NoSuchLedgerExistsException is
-                // returned we need to update firstError to indicate that it 
might be a valid read but just
-                // failed.
-                firstError = rc;
-            }
-            if (BKException.Code.NoSuchEntryException == rc
-                || BKException.Code.NoSuchLedgerExistsException == rc) {
-                ++numBookiesMissingEntry;
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("No such entry found on bookie.  L{} E{} bookie: 
{}",
-                            lh.ledgerId, eId, host);
-                }
-            } else {
-                if (LOG.isInfoEnabled()) {
-                    LOG.info("{} while reading L{} E{} from bookie: {}",
-                            errMsg, lh.ledgerId, eId, host);
-                }
-            }
-
-            lh.recordReadErrorOnBookie(bookieIndex);
-        }
-
-        /**
-         * Send to next replica speculatively, if required and possible.
-         * This returns the host we may have sent to for unit testing.
-         *
-         * @param heardFromHostsBitSet
-         *      the set of hosts that we already received responses.
-         * @return host we sent to if we sent. null otherwise.
-         */
-        abstract BookieId maybeSendSpeculativeRead(BitSet 
heardFromHostsBitSet);
-
-        /**
-         * Whether the read request completed.
-         *
-         * @return true if the read request is completed.
-         */
-        boolean isComplete() {
-            return complete.get();
-        }
-
-        /**
-         * Get result code of this entry.
-         *
-         * @return result code.
-         */
-        int getRc() {
-            return rc;
-        }
-
-        @Override
-        public String toString() {
-            return String.format("L%d-E%d", lh.getId(), eId);
-        }
-
-        /**
-         * Issues a speculative request and indicates if more speculative
-         * requests should be issued.
-         *
-         * @return whether more speculative requests should be issued
-         */
-        @Override
-        public ListenableFuture<Boolean> issueSpeculativeRequest() {
-            return clientCtx.getMainWorkerPool().submitOrdered(lh.getId(), new 
Callable<Boolean>() {
-                @Override
-                public Boolean call() throws Exception {
-                    if (!isComplete() && null != 
maybeSendSpeculativeRead(heardFromHostsBitSet)) {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("Send speculative read for {}. Hosts 
sent are {}, "
-                                            + " Hosts heard are {}, ensemble 
is {}.",
-                                this, sentToHosts, heardFromHostsBitSet, 
ensemble);
-                        }
-                        return true;
-                    }
-                    return false;
-                }
-            });
-        }
     }
 
-    class ParallelReadRequest extends LedgerEntryRequest {
+    class ParallelReadRequest extends SingleLedgerEntryRequest {
 
         int numPendings;
 
@@ -326,7 +279,7 @@ class PendingReadOp implements ReadEntryCallback, Runnable {
         }
     }
 
-    class SequenceReadRequest extends LedgerEntryRequest {
+    class SequenceReadRequest extends SingleLedgerEntryRequest {
         static final int NOT_FOUND = -1;
         int nextReplicaIndexToReadFrom = 0;
 
@@ -456,205 +409,4 @@ class PendingReadOp implements ReadEntryCallback, 
Runnable {
             return completed;
         }
     }
-
-    PendingReadOp(LedgerHandle lh,
-                  ClientContext clientCtx,
-                  long startEntryId,
-                  long endEntryId,
-                  boolean isRecoveryRead) {
-        this.seq = new LinkedList<>();
-        this.future = new CompletableFuture<>();
-        this.lh = lh;
-        this.clientCtx = clientCtx;
-        this.startEntryId = startEntryId;
-        this.endEntryId = endEntryId;
-        this.isRecoveryRead = isRecoveryRead;
-
-        this.allowFailFast = false;
-        numPendingEntries = endEntryId - startEntryId + 1;
-        requiredBookiesMissingEntryForRecovery = 
getLedgerMetadata().getWriteQuorumSize()
-                - getLedgerMetadata().getAckQuorumSize() + 1;
-        heardFromHosts = new HashSet<>();
-        heardFromHostsBitSet = new 
BitSet(getLedgerMetadata().getEnsembleSize());
-    }
-
-    CompletableFuture<LedgerEntries> future() {
-        return future;
-    }
-
-    protected LedgerMetadata getLedgerMetadata() {
-        return lh.getLedgerMetadata();
-    }
-
-    protected void cancelSpeculativeTask(boolean mayInterruptIfRunning) {
-        if (speculativeTask != null) {
-            speculativeTask.cancel(mayInterruptIfRunning);
-            speculativeTask = null;
-        }
-    }
-
-    public ScheduledFuture<?> getSpeculativeTask() {
-        return speculativeTask;
-    }
-
-    PendingReadOp parallelRead(boolean enabled) {
-        this.parallelRead = enabled;
-        return this;
-    }
-
-    void allowFailFastOnUnwritableChannel() {
-        allowFailFast = true;
-    }
-
-    public void submit() {
-        clientCtx.getMainWorkerPool().executeOrdered(lh.ledgerId, this);
-    }
-
-    void initiate() {
-        long nextEnsembleChange = startEntryId, i = startEntryId;
-        this.requestTimeNanos = MathUtils.nowInNano();
-        List<BookieId> ensemble = null;
-        do {
-            if (i == nextEnsembleChange) {
-                ensemble = getLedgerMetadata().getEnsembleAt(i);
-                nextEnsembleChange = 
LedgerMetadataUtils.getNextEnsembleChange(getLedgerMetadata(), i);
-            }
-            LedgerEntryRequest entry;
-            if (parallelRead) {
-                entry = new ParallelReadRequest(ensemble, lh.ledgerId, i);
-            } else {
-                entry = new SequenceReadRequest(ensemble, lh.ledgerId, i);
-            }
-            seq.add(entry);
-            i++;
-        } while (i <= endEntryId);
-        // read the entries.
-        for (LedgerEntryRequest entry : seq) {
-            entry.read();
-            if (!parallelRead && 
clientCtx.getConf().readSpeculativeRequestPolicy.isPresent()) {
-                speculativeTask = 
clientCtx.getConf().readSpeculativeRequestPolicy.get()
-                    .initiateSpeculativeRequest(clientCtx.getScheduler(), 
entry);
-            }
-        }
-    }
-
-    @Override
-    public void run() {
-        initiate();
-    }
-
-    private static class ReadContext implements ReadEntryCallbackCtx {
-        final int bookieIndex;
-        final BookieId to;
-        final LedgerEntryRequest entry;
-        long lac = LedgerHandle.INVALID_ENTRY_ID;
-
-        ReadContext(int bookieIndex, BookieId to, LedgerEntryRequest entry) {
-            this.bookieIndex = bookieIndex;
-            this.to = to;
-            this.entry = entry;
-        }
-
-        @Override
-        public void setLastAddConfirmed(long lac) {
-            this.lac = lac;
-        }
-
-        @Override
-        public long getLastAddConfirmed() {
-            return lac;
-        }
-    }
-
-    private static ReadContext createReadContext(int bookieIndex, BookieId to, 
LedgerEntryRequest entry) {
-        return new ReadContext(bookieIndex, to, entry);
-    }
-
-    void sendReadTo(int bookieIndex, BookieId to, LedgerEntryRequest entry) 
throws InterruptedException {
-        if (lh.throttler != null) {
-            lh.throttler.acquire();
-        }
-
-        if (isRecoveryRead) {
-            int flags = BookieProtocol.FLAG_HIGH_PRIORITY | 
BookieProtocol.FLAG_DO_FENCING;
-            clientCtx.getBookieClient().readEntry(to, lh.ledgerId, entry.eId,
-                    this, new ReadContext(bookieIndex, to, entry), flags, 
lh.ledgerKey);
-        } else {
-            clientCtx.getBookieClient().readEntry(to, lh.ledgerId, entry.eId,
-                    this, new ReadContext(bookieIndex, to, entry), 
BookieProtocol.FLAG_NONE);
-        }
-    }
-
-    @Override
-    public void readEntryComplete(int rc, long ledgerId, final long entryId, 
final ByteBuf buffer, Object ctx) {
-        final ReadContext rctx = (ReadContext) ctx;
-        final LedgerEntryRequest entry = rctx.entry;
-
-        if (rc != BKException.Code.OK) {
-            entry.logErrorAndReattemptRead(rctx.bookieIndex, rctx.to, "Error: 
" + BKException.getMessage(rc), rc);
-            return;
-        }
-
-        heardFromHosts.add(rctx.to);
-        heardFromHostsBitSet.set(rctx.bookieIndex, true);
-
-        buffer.retain();
-        // if entry has completed don't handle twice
-        if (entry.complete(rctx.bookieIndex, rctx.to, buffer)) {
-            if (!isRecoveryRead) {
-                // do not advance LastAddConfirmed for recovery reads
-                lh.updateLastConfirmed(rctx.getLastAddConfirmed(), 0L);
-            }
-            submitCallback(BKException.Code.OK);
-        } else {
-            buffer.release();
-        }
-
-        if (numPendingEntries < 0) {
-            LOG.error("Read too many values for ledger {} : [{}, {}].",
-                    ledgerId, startEntryId, endEntryId);
-        }
-    }
-
-    protected void submitCallback(int code) {
-        if (BKException.Code.OK == code) {
-            numPendingEntries--;
-            if (numPendingEntries != 0) {
-                return;
-            }
-        }
-
-        // ensure callback once
-        if (!complete.compareAndSet(false, true)) {
-            return;
-        }
-
-        cancelSpeculativeTask(true);
-
-        long latencyNanos = MathUtils.elapsedNanos(requestTimeNanos);
-        if (code != BKException.Code.OK) {
-            long firstUnread = LedgerHandle.INVALID_ENTRY_ID;
-            Integer firstRc = null;
-            for (LedgerEntryRequest req : seq) {
-                if (!req.isComplete()) {
-                    firstUnread = req.eId;
-                    firstRc = req.rc;
-                    break;
-                }
-            }
-            LOG.error(
-                    "Read of ledger entry failed: L{} E{}-E{}, Sent to {}, "
-                            + "Heard from {} : bitset = {}, Error = '{}'. 
First unread entry is ({}, rc = {})",
-                    lh.getId(), startEntryId, endEntryId, sentToHosts, 
heardFromHosts, heardFromHostsBitSet,
-                    BKException.getMessage(code), firstUnread, firstRc);
-            
clientCtx.getClientStats().getReadOpLogger().registerFailedEvent(latencyNanos, 
TimeUnit.NANOSECONDS);
-            // release the entries
-            seq.forEach(LedgerEntryRequest::close);
-            future.completeExceptionally(BKException.create(code));
-        } else {
-            
clientCtx.getClientStats().getReadOpLogger().registerSuccessfulEvent(latencyNanos,
 TimeUnit.NANOSECONDS);
-            future.complete(LedgerEntriesImpl.create(Lists.transform(seq, 
input -> input.entryImpl)));
-        }
-    }
-
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOpBase.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOpBase.java
new file mode 100644
index 0000000000..cbd68ec657
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOpBase.java
@@ -0,0 +1,293 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.BitSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class ReadOpBase implements Runnable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ReadOpBase.class);
+
+    protected ScheduledFuture<?> speculativeTask = null;
+    protected final CompletableFuture<LedgerEntries> future;
+    protected final Set<BookieId> heardFromHosts;
+    protected final BitSet heardFromHostsBitSet;
+    protected final Set<BookieId> sentToHosts = new HashSet<BookieId>();
+    LedgerHandle lh;
+    protected ClientContext clientCtx;
+
+    protected final long startEntryId;
+    protected long requestTimeNanos;
+
+    protected final int requiredBookiesMissingEntryForRecovery;
+    protected final boolean isRecoveryRead;
+
+    protected final AtomicBoolean complete = new AtomicBoolean(false);
+    protected boolean allowFailFast = false;
+    long numPendingEntries;
+    final long endEntryId;
+    protected ReadOpBase(LedgerHandle lh, ClientContext clientCtx, long 
startEntryId, long endEntryId,
+                         boolean isRecoveryRead) {
+        this.lh = lh;
+        this.future = new CompletableFuture<>();
+        this.startEntryId = startEntryId;
+        this.endEntryId = endEntryId;
+        this.isRecoveryRead = isRecoveryRead;
+        this.requiredBookiesMissingEntryForRecovery = 
getLedgerMetadata().getWriteQuorumSize()
+                - getLedgerMetadata().getAckQuorumSize() + 1;
+        this.heardFromHosts = new HashSet<>();
+        this.heardFromHostsBitSet = new 
BitSet(getLedgerMetadata().getEnsembleSize());
+        this.allowFailFast = false;
+        this.clientCtx = clientCtx;
+    }
+
+    protected LedgerMetadata getLedgerMetadata() {
+        return lh.getLedgerMetadata();
+    }
+
+    protected void cancelSpeculativeTask(boolean mayInterruptIfRunning) {
+        if (speculativeTask != null) {
+            speculativeTask.cancel(mayInterruptIfRunning);
+            speculativeTask = null;
+        }
+    }
+
+    public ScheduledFuture<?> getSpeculativeTask() {
+        return speculativeTask;
+    }
+
+    CompletableFuture<LedgerEntries> future() {
+        return future;
+    }
+
+    void allowFailFastOnUnwritableChannel() {
+        allowFailFast = true;
+    }
+
+    public void submit() {
+        clientCtx.getMainWorkerPool().executeOrdered(lh.ledgerId, this);
+    }
+
+    @Override
+    public void run() {
+        initiate();
+    }
+
+    abstract void initiate();
+
+    protected abstract void submitCallback(int code);
+
+    abstract class LedgerEntryRequest implements SpeculativeRequestExecutor {
+
+        final AtomicBoolean complete = new AtomicBoolean(false);
+
+        int rc = BKException.Code.OK;
+        int firstError = BKException.Code.OK;
+        int numBookiesMissingEntry = 0;
+
+        final long eId;
+
+        final List<BookieId> ensemble;
+        final DistributionSchedule.WriteSet writeSet;
+
+
+        LedgerEntryRequest(List<BookieId> ensemble, final long eId) {
+            this.ensemble = ensemble;
+            this.eId = eId;
+            if (clientCtx.getConf().enableReorderReadSequence) {
+                writeSet = clientCtx.getPlacementPolicy()
+                        .reorderReadSequence(
+                                ensemble,
+                                lh.getBookiesHealthInfo(),
+                                lh.getWriteSetForReadOperation(eId));
+            } else {
+                writeSet = lh.getWriteSetForReadOperation(eId);
+            }
+        }
+
+        public void close() {
+            // this request has succeeded before, can't recycle writeSet again
+            if (complete.compareAndSet(false, true)) {
+                rc = BKException.Code.UnexpectedConditionException;
+                writeSet.recycle();
+            }
+        }
+
+        /**
+         * Execute the read request.
+         */
+        abstract void read();
+
+        /**
+         * Fail the request with given result code <i>rc</i>.
+         *
+         * @param rc
+         *          result code to fail the request.
+         * @return true if we managed to fail the entry; otherwise return 
false if it already failed or completed.
+         */
+        boolean fail(int rc) {
+            if (complete.compareAndSet(false, true)) {
+                this.rc = rc;
+                writeSet.recycle();
+                submitCallback(rc);
+                return true;
+            } else {
+                return false;
+            }
+        }
+
+        /**
+         * Log error <i>errMsg</i> and reattempt read from <i>host</i>.
+         *
+         * @param bookieIndex
+         *          bookie index
+         * @param host
+         *          host that just respond
+         * @param errMsg
+         *          error msg to log
+         * @param rc
+         *          read result code
+         */
+        synchronized void logErrorAndReattemptRead(int bookieIndex, BookieId 
host, String errMsg, int rc) {
+            if (BKException.Code.OK == firstError
+                    || BKException.Code.NoSuchEntryException == firstError
+                    || BKException.Code.NoSuchLedgerExistsException == 
firstError) {
+                firstError = rc;
+            } else if (BKException.Code.BookieHandleNotAvailableException == 
firstError
+                    && BKException.Code.NoSuchEntryException != rc
+                    && BKException.Code.NoSuchLedgerExistsException != rc) {
+                // if other exception rather than NoSuchEntryException or 
NoSuchLedgerExistsException is
+                // returned we need to update firstError to indicate that it 
might be a valid read but just
+                // failed.
+                firstError = rc;
+            }
+            if (BKException.Code.NoSuchEntryException == rc
+                    || BKException.Code.NoSuchLedgerExistsException == rc) {
+                ++numBookiesMissingEntry;
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("No such entry found on bookie.  L{} E{} bookie: 
{}",
+                            lh.ledgerId, eId, host);
+                }
+            } else {
+                if (LOG.isInfoEnabled()) {
+                    LOG.info("{} while reading L{} E{} from bookie: {}",
+                            errMsg, lh.ledgerId, eId, host);
+                }
+            }
+
+            lh.recordReadErrorOnBookie(bookieIndex);
+        }
+
+        /**
+         * Send to next replica speculatively, if required and possible.
+         * This returns the host we may have sent to for unit testing.
+         *
+         * @param heardFromHostsBitSet
+         *      the set of hosts that we already received responses.
+         * @return host we sent to if we sent. null otherwise.
+         */
+        abstract BookieId maybeSendSpeculativeRead(BitSet 
heardFromHostsBitSet);
+
+        /**
+         * Whether the read request completed.
+         *
+         * @return true if the read request is completed.
+         */
+        boolean isComplete() {
+            return complete.get();
+        }
+
+        /**
+         * Get result code of this entry.
+         *
+         * @return result code.
+         */
+        int getRc() {
+            return rc;
+        }
+
+        @Override
+        public String toString() {
+            return String.format("L%d-E%d", lh.getId(), eId);
+        }
+
+        /**
+         * Issues a speculative request and indicates if more speculative
+         * requests should be issued.
+         *
+         * @return whether more speculative requests should be issued
+         */
+        @Override
+        public ListenableFuture<Boolean> issueSpeculativeRequest() {
+            return clientCtx.getMainWorkerPool().submitOrdered(lh.getId(), new 
Callable<Boolean>() {
+                @Override
+                public Boolean call() throws Exception {
+                    if (!isComplete() && null != 
maybeSendSpeculativeRead(heardFromHostsBitSet)) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Send speculative read for {}. Hosts 
sent are {}, "
+                                            + " Hosts heard are {}, ensemble 
is {}.",
+                                    this, sentToHosts, heardFromHostsBitSet, 
ensemble);
+                        }
+                        return true;
+                    }
+                    return false;
+                }
+            });
+        }
+    }
+
+    protected static class ReadContext implements 
BookkeeperInternalCallbacks.ReadEntryCallbackCtx {
+        final int bookieIndex;
+        final BookieId to;
+        final PendingReadOp.LedgerEntryRequest entry;
+        long lac = LedgerHandle.INVALID_ENTRY_ID;
+
+        ReadContext(int bookieIndex, BookieId to, 
PendingReadOp.LedgerEntryRequest entry) {
+            this.bookieIndex = bookieIndex;
+            this.to = to;
+            this.entry = entry;
+        }
+
+        @Override
+        public void setLastAddConfirmed(long lac) {
+            this.lac = lac;
+        }
+
+        @Override
+        public long getLastAddConfirmed() {
+            return lac;
+        }
+    }
+}
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
index f2ada1e5dc..423e02b4aa 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
@@ -33,7 +33,6 @@ import static org.mockito.Mockito.mock;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
-import java.lang.reflect.Method;
 import java.util.Iterator;
 import java.util.List;
 import java.util.TreeMap;
@@ -268,8 +267,8 @@ public class TestParallelRead extends 
BookKeeperClusterTestCase {
         PendingReadOp pendingReadOp = new PendingReadOp(lh, clientContext, 1, 
2, false);
         pendingReadOp.parallelRead(true);
         pendingReadOp.initiate();
-        PendingReadOp.LedgerEntryRequest first = pendingReadOp.seq.get(0);
-        PendingReadOp.LedgerEntryRequest second = pendingReadOp.seq.get(1);
+        PendingReadOp.SingleLedgerEntryRequest first = 
pendingReadOp.seq.get(0);
+        PendingReadOp.SingleLedgerEntryRequest second = 
pendingReadOp.seq.get(1);
 
         pendingReadOp.submitCallback(-105);
 
@@ -287,13 +286,9 @@ public class TestParallelRead extends 
BookKeeperClusterTestCase {
         assertTrue(second.complete.get());
 
         // Mock ledgerEntryImpl reuse
-        Method method = 
PendingReadOp.class.getDeclaredMethod("createReadContext",
-                int.class, BookieId.class, 
PendingReadOp.LedgerEntryRequest.class);
-        method.setAccessible(true);
-
         ByteBuf byteBuf = Unpooled.buffer(10);
         pendingReadOp.readEntryComplete(BKException.Code.OK, 1, 1, 
Unpooled.buffer(10),
-                method.invoke(pendingReadOp, 1, BookieId.parse("test"), 
first));
+                new ReadOpBase.ReadContext(1, BookieId.parse("test"), first));
 
         // byteBuf has been release
         assertEquals(byteBuf.refCnt(), 1);
@@ -308,15 +303,15 @@ public class TestParallelRead extends 
BookKeeperClusterTestCase {
 
         // read entry failed twice, will not close twice
         
pendingReadOp.readEntryComplete(BKException.Code.TooManyRequestsException, 1, 
1, Unpooled.buffer(10),
-                method.invoke(pendingReadOp, 1, BookieId.parse("test"), 
first));
+                new ReadOpBase.ReadContext(1, BookieId.parse("test"), first));
 
         
pendingReadOp.readEntryComplete(BKException.Code.TooManyRequestsException, 1, 
1, Unpooled.buffer(10),
-                method.invoke(pendingReadOp, 1, BookieId.parse("test"), 
first));
+                new ReadOpBase.ReadContext(1, BookieId.parse("test"), first));
 
         // will not complete twice when completed
         byteBuf = Unpooled.buffer(10);
         pendingReadOp.readEntryComplete(Code.OK, 1, 1, Unpooled.buffer(10),
-                method.invoke(pendingReadOp, 1, BookieId.parse("test"), 
first));
+                new ReadOpBase.ReadContext(1, BookieId.parse("test"), first));
         assertEquals(1, byteBuf.refCnt());
 
     }

Reply via email to