This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 87ca7f3372 Fix Memory Leak In Netty Recycler of Bookie Client (#4609)
87ca7f3372 is described below

commit 87ca7f33721592a12e86e0bb7151980249aeceaa
Author: ken <[email protected]>
AuthorDate: Tue Jun 3 05:12:51 2025 +0800

    Fix Memory Leak In Netty Recycler of Bookie Client (#4609)
    
    * make recycler static to avoid OOM problem
    
    * Fixed missing license headers
    
    * Fixed checkstyle
    
    * More checkstyle
    
    ---------
    
    Co-authored-by: fanjianye <[email protected]>
    Co-authored-by: Matteo Merli <[email protected]>
---
 .../org/apache/bookkeeper/proto/AddCompletion.java | 151 ++++
 .../bookkeeper/proto/BatchedReadCompletion.java    |  98 +++
 .../org/apache/bookkeeper/proto/CompletionKey.java |  34 +
 .../apache/bookkeeper/proto/CompletionValue.java   | 179 ++++
 .../bookkeeper/proto/EntryCompletionKey.java       |  82 ++
 .../bookkeeper/proto/ForceLedgerCompletion.java    |  70 ++
 .../bookkeeper/proto/GetBookieInfoCompletion.java  |  84 ++
 .../proto/GetListOfEntriesOfLedgerCompletion.java  |  84 ++
 .../bookkeeper/proto/PerChannelBookieClient.java   | 949 +--------------------
 .../apache/bookkeeper/proto/ReadCompletion.java    | 127 +++
 .../apache/bookkeeper/proto/ReadLacCompletion.java |  87 ++
 .../bookkeeper/proto/StartTLSCompletion.java       |  75 ++
 .../apache/bookkeeper/proto/TxnCompletionKey.java  |  51 ++
 .../bookkeeper/proto/WriteLacCompletion.java       |  76 ++
 14 files changed, 1235 insertions(+), 912 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AddCompletion.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AddCompletion.java
new file mode 100644
index 0000000000..c21226dd36
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AddCompletion.java
@@ -0,0 +1,151 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.proto;
+
+import io.netty.util.Recycler;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.common.util.MathUtils;
+import org.apache.bookkeeper.net.BookieId;
+import org.slf4j.MDC;
+
+class AddCompletion extends CompletionValue implements 
BookkeeperInternalCallbacks.WriteCallback {
+
+    static AddCompletion acquireAddCompletion(final CompletionKey key,
+                                              final 
BookkeeperInternalCallbacks.WriteCallback originalCallback,
+                                              final Object originalCtx,
+                                              final long ledgerId, final long 
entryId,
+                                              PerChannelBookieClient 
perChannelBookieClient) {
+        AddCompletion completion = ADD_COMPLETION_RECYCLER.get();
+        completion.reset(key, originalCallback, originalCtx, ledgerId, 
entryId, perChannelBookieClient);
+        return completion;
+    }
+
+    final Recycler.Handle<AddCompletion> handle;
+
+    CompletionKey key = null;
+    BookkeeperInternalCallbacks.WriteCallback originalCallback = null;
+
+    AddCompletion(Recycler.Handle<AddCompletion> handle) {
+        super("Add", null, -1, -1, null);
+        this.handle = handle;
+    }
+
+    void reset(final CompletionKey key,
+               final BookkeeperInternalCallbacks.WriteCallback 
originalCallback,
+               final Object originalCtx,
+               final long ledgerId, final long entryId,
+               PerChannelBookieClient perChannelBookieClient) {
+        this.key = key;
+        this.originalCallback = originalCallback;
+        this.ctx = originalCtx;
+        this.ledgerId = ledgerId;
+        this.entryId = entryId;
+        this.startTime = 
org.apache.bookkeeper.common.util.MathUtils.nowInNano();
+
+        this.opLogger = perChannelBookieClient.addEntryOpLogger;
+        this.timeoutOpLogger = perChannelBookieClient.addTimeoutOpLogger;
+        this.perChannelBookieClient = perChannelBookieClient;
+        this.mdcContextMap = 
perChannelBookieClient.preserveMdcForTaskExecution ? MDC.getCopyOfContextMap() 
: null;
+    }
+
+    @Override
+    public void release() {
+        this.ctx = null;
+        this.opLogger = null;
+        this.timeoutOpLogger = null;
+        this.perChannelBookieClient = null;
+        this.mdcContextMap = null;
+        handle.recycle(this);
+    }
+
+    @Override
+    public void writeComplete(int rc, long ledgerId, long entryId,
+                              BookieId addr,
+                              Object ctx) {
+        logOpResult(rc);
+        originalCallback.writeComplete(rc, ledgerId, entryId, addr, ctx);
+        key.release();
+        this.release();
+    }
+
+    @Override
+    boolean maybeTimeout() {
+        if (MathUtils.elapsedNanos(startTime) >= 
perChannelBookieClient.addEntryTimeoutNanos) {
+            timeout();
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public void errorOut() {
+        errorOut(BKException.Code.BookieHandleNotAvailableException);
+    }
+
+    @Override
+    public void errorOut(final int rc) {
+        errorOutAndRunCallback(
+                () -> writeComplete(rc, ledgerId, entryId, 
perChannelBookieClient.bookieId, ctx));
+    }
+
+    @Override
+    public void setOutstanding() {
+        perChannelBookieClient.addEntryOutstanding.inc();
+    }
+
+    @Override
+    public void handleV2Response(
+            long ledgerId, long entryId, BookkeeperProtocol.StatusCode status,
+            BookieProtocol.Response response) {
+        perChannelBookieClient.addEntryOutstanding.dec();
+        handleResponse(ledgerId, entryId, status);
+    }
+
+    @Override
+    public void handleV3Response(
+            BookkeeperProtocol.Response response) {
+        perChannelBookieClient.addEntryOutstanding.dec();
+        BookkeeperProtocol.AddResponse addResponse = response.getAddResponse();
+        BookkeeperProtocol.StatusCode status = response.getStatus() == 
BookkeeperProtocol.StatusCode.EOK
+                ? addResponse.getStatus() : response.getStatus();
+        handleResponse(addResponse.getLedgerId(), addResponse.getEntryId(),
+                status);
+    }
+
+    private void handleResponse(long ledgerId, long entryId,
+                                BookkeeperProtocol.StatusCode status) {
+        if (LOG.isDebugEnabled()) {
+            logResponse(status, "ledger", ledgerId, "entry", entryId);
+        }
+
+        int rc = convertStatus(status, BKException.Code.WriteException);
+        writeComplete(rc, ledgerId, entryId, perChannelBookieClient.bookieId, 
ctx);
+    }
+
+    private static final Recycler<AddCompletion> ADD_COMPLETION_RECYCLER = new 
Recycler<AddCompletion>() {
+        @Override
+        protected AddCompletion newObject(Recycler.Handle<AddCompletion> 
handle) {
+            return new AddCompletion(handle);
+        }
+    };
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BatchedReadCompletion.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BatchedReadCompletion.java
new file mode 100644
index 0000000000..c28fcc6c2d
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BatchedReadCompletion.java
@@ -0,0 +1,98 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.proto;
+
+import static org.apache.bookkeeper.client.LedgerHandle.INVALID_ENTRY_ID;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.util.ByteBufList;
+
+class BatchedReadCompletion extends CompletionValue {
+
+    final BookkeeperInternalCallbacks.BatchedReadEntryCallback cb;
+
+    public BatchedReadCompletion(final CompletionKey key,
+                                 final 
BookkeeperInternalCallbacks.BatchedReadEntryCallback originalCallback,
+                                 final Object originalCtx,
+                                 long ledgerId, final long entryId,
+                                 PerChannelBookieClient 
perChannelBookieClient) {
+        super("BatchedRead", originalCtx, ledgerId, entryId, 
perChannelBookieClient);
+        this.opLogger = perChannelBookieClient.readEntryOpLogger;
+        this.timeoutOpLogger = perChannelBookieClient.readTimeoutOpLogger;
+        this.cb = (rc, ledgerId1, startEntryId, bufList, ctx) -> {
+            logOpResult(rc);
+            originalCallback.readEntriesComplete(rc,
+                    ledgerId1, entryId,
+                    bufList, originalCtx);
+            key.release();
+        };
+    }
+
+    @Override
+    public void errorOut() {
+        errorOut(BKException.Code.BookieHandleNotAvailableException);
+    }
+
+    @Override
+    public void errorOut(final int rc) {
+        errorOutAndRunCallback(
+                () -> cb.readEntriesComplete(rc, ledgerId,
+                        entryId, null, ctx));
+    }
+
+    @Override
+    public void handleV2Response(long ledgerId,
+                                 long entryId,
+                                 BookkeeperProtocol.StatusCode status,
+                                 BookieProtocol.Response response) {
+
+        perChannelBookieClient.readEntryOutstanding.dec();
+        if (!(response instanceof BookieProtocol.BatchedReadResponse)) {
+            return;
+        }
+        BookieProtocol.BatchedReadResponse readResponse = 
(BookieProtocol.BatchedReadResponse) response;
+        handleBatchedReadResponse(ledgerId, entryId, status, 
readResponse.getData(),
+                INVALID_ENTRY_ID, -1L);
+    }
+
+    @Override
+    public void handleV3Response(BookkeeperProtocol.Response response) {
+        // V3 protocol haven't supported batched read yet.
+    }
+
+    private void handleBatchedReadResponse(long ledgerId,
+                                           long entryId,
+                                           BookkeeperProtocol.StatusCode 
status,
+                                           ByteBufList buffers,
+                                           long maxLAC, // max known lac 
piggy-back from bookies
+                                           long lacUpdateTimestamp) { // the 
timestamp when the lac is updated.
+        int rc = convertStatus(status, BKException.Code.ReadException);
+
+        if (maxLAC > INVALID_ENTRY_ID && (ctx instanceof 
BookkeeperInternalCallbacks.ReadEntryCallbackCtx)) {
+            ((BookkeeperInternalCallbacks.ReadEntryCallbackCtx) 
ctx).setLastAddConfirmed(maxLAC);
+        }
+        if (lacUpdateTimestamp > -1L && (ctx instanceof 
ReadLastConfirmedAndEntryContext)) {
+            ((ReadLastConfirmedAndEntryContext) 
ctx).setLacUpdateTimestamp(lacUpdateTimestamp);
+        }
+        cb.readEntriesComplete(rc, ledgerId, entryId, buffers, ctx);
+    }
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/CompletionKey.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/CompletionKey.java
new file mode 100644
index 0000000000..0d543676e5
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/CompletionKey.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.proto;
+
+import org.apache.bookkeeper.proto.BookkeeperProtocol.OperationType;
+
+abstract class CompletionKey {
+    OperationType operationType;
+
+    CompletionKey(OperationType operationType) {
+        this.operationType = operationType;
+    }
+
+    public void release() {}
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/CompletionValue.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/CompletionValue.java
new file mode 100644
index 0000000000..f434e0a0c5
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/CompletionValue.java
@@ -0,0 +1,179 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.proto;
+
+import com.google.common.base.Joiner;
+import io.netty.channel.Channel;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.common.util.MathUtils;
+import org.apache.bookkeeper.common.util.MdcUtils;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+abstract class CompletionValue {
+    private final String operationName;
+    protected Object ctx;
+    protected long ledgerId;
+    protected long entryId;
+    protected long startTime;
+    protected OpStatsLogger opLogger;
+    protected OpStatsLogger timeoutOpLogger;
+    protected Map<String, String> mdcContextMap;
+    protected PerChannelBookieClient perChannelBookieClient;
+
+    static final Logger LOG = LoggerFactory.getLogger(CompletionValue.class);
+
+    public CompletionValue(String operationName,
+                           Object ctx,
+                           long ledgerId, long entryId, PerChannelBookieClient 
perChannelBookieClient) {
+        this.operationName = operationName;
+        this.ctx = ctx;
+        this.ledgerId = ledgerId;
+        this.entryId = entryId;
+        this.startTime = MathUtils.nowInNano();
+        this.perChannelBookieClient = perChannelBookieClient;
+        if (perChannelBookieClient != null) {
+            this.mdcContextMap = 
perChannelBookieClient.preserveMdcForTaskExecution ? MDC.getCopyOfContextMap() 
: null;
+        }
+    }
+
+    private long latency() {
+        return MathUtils.elapsedNanos(startTime);
+    }
+
+    void logOpResult(int rc) {
+        if (rc != BKException.Code.OK) {
+            opLogger.registerFailedEvent(latency(), TimeUnit.NANOSECONDS);
+        } else {
+            opLogger.registerSuccessfulEvent(latency(), TimeUnit.NANOSECONDS);
+        }
+
+        if (rc != BKException.Code.OK
+                && 
!PerChannelBookieClient.EXPECTED_BK_OPERATION_ERRORS.contains(rc)) {
+            perChannelBookieClient.recordError();
+        }
+    }
+
+    boolean maybeTimeout() {
+        if (MathUtils.elapsedNanos(startTime) >= 
perChannelBookieClient.readEntryTimeoutNanos) {
+            timeout();
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    void timeout() {
+        errorOut(BKException.Code.TimeoutException);
+        timeoutOpLogger.registerSuccessfulEvent(latency(),
+                TimeUnit.NANOSECONDS);
+    }
+
+    protected void logResponse(BookkeeperProtocol.StatusCode status, Object... 
extraInfo) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Got {} response from bookie:{} rc:{}, {}", 
operationName,
+                    perChannelBookieClient.bookieId, status, 
Joiner.on(":").join(extraInfo));
+        }
+    }
+
+    protected int convertStatus(BookkeeperProtocol.StatusCode status, int 
defaultStatus) {
+        // convert to BKException code
+        int rcToRet = statusCodeToExceptionCode(status);
+        if (rcToRet == BKException.Code.UNINITIALIZED) {
+            LOG.error("{} for failed on bookie {} code {}",
+                    operationName, perChannelBookieClient.bookieId, status);
+            return defaultStatus;
+        } else {
+            return rcToRet;
+        }
+    }
+
+    /**
+     * @param status
+     * @return {@link BKException.Code.UNINITIALIZED} if the statuscode is 
unknown.
+     */
+    private int statusCodeToExceptionCode(BookkeeperProtocol.StatusCode 
status) {
+        switch (status) {
+            case EOK:
+                return BKException.Code.OK;
+            case ENOENTRY:
+                return BKException.Code.NoSuchEntryException;
+            case ENOLEDGER:
+                return BKException.Code.NoSuchLedgerExistsException;
+            case EBADVERSION:
+                return BKException.Code.ProtocolVersionException;
+            case EUA:
+                return BKException.Code.UnauthorizedAccessException;
+            case EFENCED:
+                return BKException.Code.LedgerFencedException;
+            case EREADONLY:
+                return BKException.Code.WriteOnReadOnlyBookieException;
+            case ETOOMANYREQUESTS:
+                return BKException.Code.TooManyRequestsException;
+            case EUNKNOWNLEDGERSTATE:
+                return BKException.Code.DataUnknownException;
+            default:
+                return BKException.Code.UNINITIALIZED;
+        }
+    }
+
+    public void restoreMdcContext() {
+        MdcUtils.restoreContext(mdcContextMap);
+    }
+
+    public abstract void errorOut();
+    public abstract void errorOut(int rc);
+    public void setOutstanding() {
+        // no-op
+    }
+
+    protected void errorOutAndRunCallback(final Runnable callback) {
+        perChannelBookieClient.executor.executeOrdered(ledgerId, () -> {
+            String bAddress = "null";
+            Channel c = perChannelBookieClient.channel;
+            if (c != null && c.remoteAddress() != null) {
+                bAddress = c.remoteAddress().toString();
+            }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Could not write {} request to bookie {} for ledger 
{}, entry {}",
+                        operationName, bAddress,
+                        ledgerId, entryId);
+            }
+            callback.run();
+        });
+    }
+
+    public void handleV2Response(
+            long ledgerId, long entryId, BookkeeperProtocol.StatusCode status,
+            BookieProtocol.Response response) {
+        LOG.warn("Unhandled V2 response {}", response);
+    }
+
+    public abstract void handleV3Response(
+            BookkeeperProtocol.Response response);
+
+    public void release() {}
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/EntryCompletionKey.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/EntryCompletionKey.java
new file mode 100644
index 0000000000..09f6413f7f
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/EntryCompletionKey.java
@@ -0,0 +1,82 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.proto;
+
+import io.netty.util.Recycler;
+
+class EntryCompletionKey extends CompletionKey {
+    private final Recycler.Handle<EntryCompletionKey> recyclerHandle;
+    long ledgerId;
+    long entryId;
+
+    static EntryCompletionKey acquireV2Key(long ledgerId, long entryId,
+                                           BookkeeperProtocol.OperationType 
operationType) {
+        EntryCompletionKey key = V2_KEY_RECYCLER.get();
+        key.reset(ledgerId, entryId, operationType);
+        return key;
+    }
+
+    private EntryCompletionKey(Recycler.Handle<EntryCompletionKey> handle) {
+        super(null);
+        this.recyclerHandle = handle;
+    }
+
+    void reset(long ledgerId, long entryId, BookkeeperProtocol.OperationType 
operationType) {
+        this.ledgerId = ledgerId;
+        this.entryId = entryId;
+        this.operationType = operationType;
+    }
+
+    @Override
+    public boolean equals(Object object) {
+        if (!(object instanceof EntryCompletionKey)) {
+            return  false;
+        }
+        EntryCompletionKey that = (EntryCompletionKey) object;
+        return this.entryId == that.entryId
+                && this.ledgerId == that.ledgerId
+                && this.operationType == that.operationType;
+    }
+
+    @Override
+    public int hashCode() {
+        return Long.hashCode(ledgerId) * 31 + Long.hashCode(entryId);
+    }
+
+    @Override
+    public String toString() {
+        return String.format("%d:%d %s", ledgerId, entryId, operationType);
+    }
+
+    public void release() {
+        recyclerHandle.recycle(this);
+    }
+
+    private static final Recycler<EntryCompletionKey> V2_KEY_RECYCLER =
+            new Recycler<EntryCompletionKey>() {
+                @Override
+                protected EntryCompletionKey newObject(
+                        Recycler.Handle<EntryCompletionKey> handle) {
+                    return new EntryCompletionKey(handle);
+                }
+            };
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ForceLedgerCompletion.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ForceLedgerCompletion.java
new file mode 100644
index 0000000000..e0717056f4
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ForceLedgerCompletion.java
@@ -0,0 +1,70 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.proto;
+
+import org.apache.bookkeeper.client.BKException;
+
+class ForceLedgerCompletion extends CompletionValue {
+    final BookkeeperInternalCallbacks.ForceLedgerCallback cb;
+
+    public ForceLedgerCompletion(final CompletionKey key,
+                                 final 
BookkeeperInternalCallbacks.ForceLedgerCallback originalCallback,
+                                 final Object originalCtx,
+                                 final long ledgerId,
+                                 PerChannelBookieClient 
perChannelBookieClient) {
+        super("ForceLedger",
+                originalCtx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, 
perChannelBookieClient);
+        this.opLogger = perChannelBookieClient.forceLedgerOpLogger;
+        this.timeoutOpLogger = 
perChannelBookieClient.forceLedgerTimeoutOpLogger;
+        this.cb = (rc, ledgerId1, addr, ctx) -> {
+            logOpResult(rc);
+            originalCallback.forceLedgerComplete(rc, ledgerId1,
+                    addr, originalCtx);
+            key.release();
+        };
+    }
+
+    @Override
+    public void errorOut() {
+        errorOut(BKException.Code.BookieHandleNotAvailableException);
+    }
+
+    @Override
+    public void errorOut(final int rc) {
+        errorOutAndRunCallback(
+                () -> cb.forceLedgerComplete(rc, ledgerId, 
perChannelBookieClient.bookieId, ctx));
+    }
+
+    @Override
+    public void handleV3Response(BookkeeperProtocol.Response response) {
+        BookkeeperProtocol.ForceLedgerResponse forceLedgerResponse = 
response.getForceLedgerResponse();
+        BookkeeperProtocol.StatusCode status = response.getStatus() == 
BookkeeperProtocol.StatusCode.EOK
+                ? forceLedgerResponse.getStatus() : response.getStatus();
+        long ledgerId = forceLedgerResponse.getLedgerId();
+
+        if (LOG.isDebugEnabled()) {
+            logResponse(status, "ledger", ledgerId);
+        }
+        int rc = convertStatus(status, BKException.Code.WriteException);
+        cb.forceLedgerComplete(rc, ledgerId, perChannelBookieClient.bookieId, 
ctx);
+    }
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoCompletion.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoCompletion.java
new file mode 100644
index 0000000000..e502749d79
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoCompletion.java
@@ -0,0 +1,84 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.proto;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookieInfoReader;
+import org.apache.bookkeeper.common.util.MathUtils;
+
+class GetBookieInfoCompletion extends CompletionValue {
+    final BookkeeperInternalCallbacks.GetBookieInfoCallback cb;
+
+    public GetBookieInfoCompletion(final CompletionKey key,
+                                   final 
BookkeeperInternalCallbacks.GetBookieInfoCallback origCallback,
+                                   final Object origCtx,
+                                   PerChannelBookieClient 
perChannelBookieClient) {
+        super("GetBookieInfo", origCtx, 0L, 0L, perChannelBookieClient);
+        this.opLogger = perChannelBookieClient.getBookieInfoOpLogger;
+        this.timeoutOpLogger = 
perChannelBookieClient.getBookieInfoTimeoutOpLogger;
+        this.cb = (rc, bInfo, ctx) -> {
+            logOpResult(rc);
+            origCallback.getBookieInfoComplete(rc, bInfo, origCtx);
+            key.release();
+        };
+    }
+
+    @Override
+    boolean maybeTimeout() {
+        if (MathUtils.elapsedNanos(startTime) >= 
perChannelBookieClient.getBookieInfoTimeoutNanos) {
+            timeout();
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public void errorOut() {
+        errorOut(BKException.Code.BookieHandleNotAvailableException);
+    }
+
+    @Override
+    public void errorOut(final int rc) {
+        errorOutAndRunCallback(
+                () -> cb.getBookieInfoComplete(rc, new 
BookieInfoReader.BookieInfo(), ctx));
+    }
+
+    @Override
+    public void handleV3Response(BookkeeperProtocol.Response response) {
+        BookkeeperProtocol.GetBookieInfoResponse getBookieInfoResponse = 
response.getGetBookieInfoResponse();
+        BookkeeperProtocol.StatusCode status = response.getStatus() == 
BookkeeperProtocol.StatusCode.EOK
+                ? getBookieInfoResponse.getStatus() : response.getStatus();
+
+        long freeDiskSpace = getBookieInfoResponse.getFreeDiskSpace();
+        long totalDiskSpace = getBookieInfoResponse.getTotalDiskCapacity();
+
+        if (LOG.isDebugEnabled()) {
+            logResponse(status, "freeDisk", freeDiskSpace, "totalDisk", 
totalDiskSpace);
+        }
+
+        int rc = convertStatus(status, BKException.Code.ReadException);
+        cb.getBookieInfoComplete(rc,
+                new BookieInfoReader.BookieInfo(totalDiskSpace,
+                        freeDiskSpace), ctx);
+    }
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetListOfEntriesOfLedgerCompletion.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetListOfEntriesOfLedgerCompletion.java
new file mode 100644
index 0000000000..fea19b83d3
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetListOfEntriesOfLedgerCompletion.java
@@ -0,0 +1,84 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.proto;
+
+import static 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetListOfEntriesOfLedgerCallback;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.util.AvailabilityOfEntriesOfLedger;
+
+class GetListOfEntriesOfLedgerCompletion extends CompletionValue {
+    final BookkeeperInternalCallbacks.GetListOfEntriesOfLedgerCallback cb;
+
+    public GetListOfEntriesOfLedgerCompletion(final CompletionKey key,
+                                              final 
GetListOfEntriesOfLedgerCallback origCallback,
+                                              final long ledgerId,
+                                              PerChannelBookieClient 
perChannelBookieClient) {
+        super("GetListOfEntriesOfLedger", null, ledgerId, 0L, 
perChannelBookieClient);
+        this.opLogger = 
perChannelBookieClient.getListOfEntriesOfLedgerCompletionOpLogger;
+        this.timeoutOpLogger = 
perChannelBookieClient.getListOfEntriesOfLedgerCompletionTimeoutOpLogger;
+        this.cb = (rc, ledgerId1, availabilityOfEntriesOfLedger) -> {
+            logOpResult(rc);
+            origCallback.getListOfEntriesOfLedgerComplete(rc, ledgerId1, 
availabilityOfEntriesOfLedger);
+            key.release();
+        };
+    }
+
+    @Override
+    public void errorOut() {
+        errorOut(BKException.Code.BookieHandleNotAvailableException);
+    }
+
+    @Override
+    public void errorOut(final int rc) {
+        errorOutAndRunCallback(() -> cb.getListOfEntriesOfLedgerComplete(rc, 
ledgerId, null));
+    }
+
+    @Override
+    public void handleV3Response(BookkeeperProtocol.Response response) {
+        BookkeeperProtocol.GetListOfEntriesOfLedgerResponse 
getListOfEntriesOfLedgerResponse = response
+                .getGetListOfEntriesOfLedgerResponse();
+        ByteBuf availabilityOfEntriesOfLedgerBuffer = Unpooled.EMPTY_BUFFER;
+        BookkeeperProtocol.StatusCode status =
+                response.getStatus() == BookkeeperProtocol.StatusCode.EOK ? 
getListOfEntriesOfLedgerResponse.getStatus()
+                        : response.getStatus();
+
+        if 
(getListOfEntriesOfLedgerResponse.hasAvailabilityOfEntriesOfLedger()) {
+            availabilityOfEntriesOfLedgerBuffer = Unpooled.wrappedBuffer(
+                    
getListOfEntriesOfLedgerResponse.getAvailabilityOfEntriesOfLedger().asReadOnlyByteBuffer());
+        }
+
+        if (LOG.isDebugEnabled()) {
+            logResponse(status, "ledgerId", ledgerId);
+        }
+
+        int rc = convertStatus(status, BKException.Code.ReadException);
+        AvailabilityOfEntriesOfLedger availabilityOfEntriesOfLedger = null;
+        if (rc == BKException.Code.OK) {
+            availabilityOfEntriesOfLedger = new AvailabilityOfEntriesOfLedger(
+                    availabilityOfEntriesOfLedgerBuffer.slice());
+        }
+        cb.getListOfEntriesOfLedgerComplete(rc, ledgerId, 
availabilityOfEntriesOfLedger);
+    }
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 019aeb0637..40712bf854 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -18,9 +18,6 @@
  */
 package org.apache.bookkeeper.proto;
 
-import static org.apache.bookkeeper.client.LedgerHandle.INVALID_ENTRY_ID;
-
-import com.google.common.base.Joiner;
 import com.google.common.collect.Sets;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.ExtensionRegistry;
@@ -28,7 +25,6 @@ import com.google.protobuf.UnsafeByteOperations;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.Unpooled;
 import io.netty.buffer.UnpooledByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
@@ -61,7 +57,6 @@ import io.netty.incubator.channel.uring.IOUringEventLoopGroup;
 import io.netty.incubator.channel.uring.IOUringSocketChannel;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
-import io.netty.util.ReferenceCountUtil;
 import io.netty.util.ReferenceCounted;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
@@ -95,7 +90,6 @@ import org.apache.bookkeeper.auth.BookKeeperPrincipal;
 import org.apache.bookkeeper.auth.ClientAuthProvider;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeperClientStats;
-import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
 import org.apache.bookkeeper.client.api.WriteFlag;
 import org.apache.bookkeeper.common.util.MathUtils;
 import org.apache.bookkeeper.common.util.MdcUtils;
@@ -109,31 +103,22 @@ import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
 import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetListOfEntriesOfLedgerCallback;
 import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
-import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallbackCtx;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
-import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.StartTLSCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.ForceLedgerRequest;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.ForceLedgerResponse;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.GetBookieInfoRequest;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.GetBookieInfoResponse;
 import 
org.apache.bookkeeper.proto.BookkeeperProtocol.GetListOfEntriesOfLedgerRequest;
-import 
org.apache.bookkeeper.proto.BookkeeperProtocol.GetListOfEntriesOfLedgerResponse;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.OperationType;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.ProtocolVersion;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacResponse;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadRequest;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacResponse;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.OpStatsLogger;
@@ -142,7 +127,6 @@ import org.apache.bookkeeper.stats.annotations.StatsDoc;
 import org.apache.bookkeeper.tls.SecurityException;
 import org.apache.bookkeeper.tls.SecurityHandlerFactory;
 import org.apache.bookkeeper.tls.SecurityHandlerFactory.NodeType;
-import org.apache.bookkeeper.util.AvailabilityOfEntriesOfLedger;
 import org.apache.bookkeeper.util.ByteBufList;
 import org.apache.bookkeeper.util.StringUtils;
 import org.apache.bookkeeper.util.collections.ConcurrentOpenHashMap;
@@ -165,7 +149,7 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
     static final Logger LOG = 
LoggerFactory.getLogger(PerChannelBookieClient.class);
 
     // this set contains the bookie error return codes that we do not consider 
for a bookie to be "faulty"
-    private static final Set<Integer> expectedBkOperationErrors = 
Collections.unmodifiableSet(Sets
+    protected static final Set<Integer> EXPECTED_BK_OPERATION_ERRORS = 
Collections.unmodifiableSet(Sets
             .newHashSet(BKException.Code.BookieHandleNotAvailableException,
                         BKException.Code.NoSuchEntryException,
                         BKException.Code.NoSuchLedgerExistsException,
@@ -201,79 +185,79 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
         name = BookKeeperClientStats.CHANNEL_READ_OP,
         help = "channel stats of read entries requests"
     )
-    private final OpStatsLogger readEntryOpLogger;
+    protected final OpStatsLogger readEntryOpLogger;
     @StatsDoc(
         name = BookKeeperClientStats.CHANNEL_TIMEOUT_READ,
         help = "timeout stats of read entries requests"
     )
-    private final OpStatsLogger readTimeoutOpLogger;
+    protected final OpStatsLogger readTimeoutOpLogger;
     @StatsDoc(
         name = BookKeeperClientStats.CHANNEL_ADD_OP,
         help = "channel stats of add entries requests"
     )
-    private final OpStatsLogger addEntryOpLogger;
+    protected final OpStatsLogger addEntryOpLogger;
     @StatsDoc(
         name = BookKeeperClientStats.CHANNEL_WRITE_LAC_OP,
         help = "channel stats of write_lac requests"
     )
-    private final OpStatsLogger writeLacOpLogger;
+    protected final OpStatsLogger writeLacOpLogger;
     @StatsDoc(
         name = BookKeeperClientStats.CHANNEL_FORCE_OP,
         help = "channel stats of force requests"
     )
-    private final OpStatsLogger forceLedgerOpLogger;
+    protected final OpStatsLogger forceLedgerOpLogger;
     @StatsDoc(
         name = BookKeeperClientStats.CHANNEL_READ_LAC_OP,
         help = "channel stats of read_lac requests"
     )
-    private final OpStatsLogger readLacOpLogger;
+    protected final OpStatsLogger readLacOpLogger;
     @StatsDoc(
         name = BookKeeperClientStats.CHANNEL_TIMEOUT_ADD,
         help = "timeout stats of add entries requests"
     )
-    private final OpStatsLogger addTimeoutOpLogger;
+    protected final OpStatsLogger addTimeoutOpLogger;
     @StatsDoc(
         name = BookKeeperClientStats.CHANNEL_TIMEOUT_WRITE_LAC,
         help = "timeout stats of write_lac requests"
     )
-    private final OpStatsLogger writeLacTimeoutOpLogger;
+    protected final OpStatsLogger writeLacTimeoutOpLogger;
     @StatsDoc(
         name = BookKeeperClientStats.CHANNEL_TIMEOUT_FORCE,
         help = "timeout stats of force requests"
     )
-    private final OpStatsLogger forceLedgerTimeoutOpLogger;
+    protected final OpStatsLogger forceLedgerTimeoutOpLogger;
     @StatsDoc(
         name = BookKeeperClientStats.CHANNEL_TIMEOUT_READ_LAC,
         help = "timeout stats of read_lac requests"
     )
-    private final OpStatsLogger readLacTimeoutOpLogger;
+    protected final OpStatsLogger readLacTimeoutOpLogger;
     @StatsDoc(
         name = BookKeeperClientStats.GET_BOOKIE_INFO_OP,
         help = "channel stats of get_bookie_info requests"
     )
-    private final OpStatsLogger getBookieInfoOpLogger;
+    protected final OpStatsLogger getBookieInfoOpLogger;
     @StatsDoc(
         name = BookKeeperClientStats.TIMEOUT_GET_BOOKIE_INFO,
         help = "timeout stats of get_bookie_info requests"
     )
-    private final OpStatsLogger getBookieInfoTimeoutOpLogger;
+    protected final OpStatsLogger getBookieInfoTimeoutOpLogger;
     @StatsDoc(
         name = BookKeeperClientStats.CHANNEL_START_TLS_OP,
         help = "channel stats of start_tls requests"
     )
-    private final OpStatsLogger startTLSOpLogger;
+    protected final OpStatsLogger startTLSOpLogger;
     @StatsDoc(
         name = BookKeeperClientStats.CHANNEL_TIMEOUT_START_TLS_OP,
         help = "timeout stats of start_tls requests"
     )
-    private final OpStatsLogger startTLSTimeoutOpLogger;
+    protected final OpStatsLogger startTLSTimeoutOpLogger;
     @StatsDoc(
         name = BookKeeperClientStats.CLIENT_CONNECT_TIMER,
         help = "channel stats of connect requests"
     )
     private final OpStatsLogger connectTimer;
-    private final OpStatsLogger getListOfEntriesOfLedgerCompletionOpLogger;
-    private final OpStatsLogger 
getListOfEntriesOfLedgerCompletionTimeoutOpLogger;
+    protected final OpStatsLogger getListOfEntriesOfLedgerCompletionOpLogger;
+    protected final OpStatsLogger 
getListOfEntriesOfLedgerCompletionTimeoutOpLogger;
     @StatsDoc(
         name = BookKeeperClientStats.NETTY_EXCEPTION_CNT,
         help = "the number of exceptions received from this channel"
@@ -283,12 +267,12 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
         name = BookKeeperClientStats.ADD_OP_OUTSTANDING,
         help = "the number of outstanding add_entry requests"
     )
-    private final Counter addEntryOutstanding;
+    protected final Counter addEntryOutstanding;
     @StatsDoc(
         name = BookKeeperClientStats.READ_OP_OUTSTANDING,
         help = "the number of outstanding add_entry requests"
     )
-    private final Counter readEntryOutstanding;
+    protected final Counter readEntryOutstanding;
     /* collect stats on all Ops that flows through netty pipeline */
     @StatsDoc(
         name = BookKeeperClientStats.NETTY_OPS,
@@ -317,7 +301,7 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
     private final Counter failedTlsHandshakeCounter;
 
     private final boolean useV2WireProtocol;
-    private final boolean preserveMdcForTaskExecution;
+    protected final boolean preserveMdcForTaskExecution;
 
     /**
      * The following member variables do not need to be concurrent, or volatile
@@ -343,7 +327,7 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
 
     volatile ConnectionState state;
     final ReentrantReadWriteLock closeLock = new ReentrantReadWriteLock();
-    private final ClientConfiguration conf;
+    protected final ClientConfiguration conf;
 
     private final PerChannelBookieClientPool pcbcPool;
     private final ClientAuthProvider.Factory authProviderFactory;
@@ -698,7 +682,7 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
         // writeLac is mostly like addEntry hence uses addEntryTimeout
         completionObjects.put(completionKey,
                               new WriteLacCompletion(completionKey, cb,
-                                                     ctx, ledgerId));
+                                                     ctx, ledgerId, this));
 
         // Build the request
         BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
@@ -737,7 +721,7 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
         // force is mostly like addEntry hence uses addEntryTimeout
         completionObjects.put(completionKey,
                               new ForceLedgerCompletion(completionKey, cb,
-                                                     ctx, ledgerId));
+                                                     ctx, ledgerId, this));
 
         // Build the request
         BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
@@ -787,7 +771,7 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
                 cb.writeComplete(BKException.Code.IllegalOpException, 
ledgerId, entryId, bookieId, ctx);
                 return;
             }
-            completionKey = acquireV2Key(ledgerId, entryId, 
OperationType.ADD_ENTRY);
+            completionKey = EntryCompletionKey.acquireV2Key(ledgerId, entryId, 
OperationType.ADD_ENTRY);
 
             if (toSend instanceof ByteBuf) {
                 ByteBuf byteBuf = ((ByteBuf) toSend).retainedDuplicate();
@@ -839,8 +823,8 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
         }
 
         putCompletionKeyValue(completionKey,
-                              acquireAddCompletion(completionKey,
-                                                   cb, ctx, ledgerId, 
entryId));
+                              AddCompletion.acquireAddCompletion(completionKey,
+                                                   cb, ctx, ledgerId, entryId, 
this));
         // addEntry times out on backpressure
         writeAndFlush(channel, completionKey, request, allowFastFail, 
cleanupActionFailedBeforeWrite,
                 cleanupActionAfterWrite);
@@ -852,7 +836,7 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
         if (useV2WireProtocol) {
             request = 
BookieProtocol.ReadRequest.create(BookieProtocol.CURRENT_PROTOCOL_VERSION,
                                                      ledgerId, 0, (short) 0, 
null);
-            completionKey = acquireV2Key(ledgerId, 0, OperationType.READ_LAC);
+            completionKey = EntryCompletionKey.acquireV2Key(ledgerId, 0, 
OperationType.READ_LAC);
         } else {
             final long txnId = getTxnId();
             completionKey = new TxnCompletionKey(txnId, 
OperationType.READ_LAC);
@@ -871,14 +855,15 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
         }
         putCompletionKeyValue(completionKey,
                               new ReadLacCompletion(completionKey, cb,
-                                                    ctx, ledgerId));
+                                                    ctx, ledgerId, this));
         writeAndFlush(channel, completionKey, request);
     }
 
     public void getListOfEntriesOfLedger(final long ledgerId, 
GetListOfEntriesOfLedgerCallback cb) {
         final long txnId = getTxnId();
         final CompletionKey completionKey = new TxnCompletionKey(txnId, 
OperationType.GET_LIST_OF_ENTRIES_OF_LEDGER);
-        completionObjects.put(completionKey, new 
GetListOfEntriesOfLedgerCompletion(completionKey, cb, ledgerId));
+        completionObjects.put(completionKey, new 
GetListOfEntriesOfLedgerCompletion(
+                completionKey, cb, ledgerId, this));
 
         // Build the request.
         BKPacketHeader.Builder headerBuilder = 
BKPacketHeader.newBuilder().setVersion(ProtocolVersion.VERSION_THREE)
@@ -936,7 +921,7 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
         if (useV2WireProtocol) {
             request = 
BookieProtocol.ReadRequest.create(BookieProtocol.CURRENT_PROTOCOL_VERSION,
                     ledgerId, entryId, (short) flags, masterKey);
-            completionKey = acquireV2Key(ledgerId, entryId, 
OperationType.READ_ENTRY);
+            completionKey = EntryCompletionKey.acquireV2Key(ledgerId, entryId, 
OperationType.READ_ENTRY);
         } else {
             final long txnId = getTxnId();
             completionKey = new TxnCompletionKey(txnId, 
OperationType.READ_ENTRY);
@@ -995,7 +980,7 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
                     .build();
         }
 
-        ReadCompletion readCompletion = new ReadCompletion(completionKey, cb, 
ctx, ledgerId, entryId);
+        ReadCompletion readCompletion = new ReadCompletion(completionKey, cb, 
ctx, ledgerId, entryId, this);
         putCompletionKeyValue(completionKey, readCompletion);
 
         writeAndFlush(channel, completionKey, request, allowFastFail, null, 
null);
@@ -1038,7 +1023,7 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
             throw new UnsupportedOperationException("Unsupported batch read 
entry operation for v3 protocol.");
         }
         BatchedReadCompletion readCompletion = new BatchedReadCompletion(
-                completionKey, cb, ctx, ledgerId, startEntryId);
+                completionKey, cb, ctx, ledgerId, startEntryId, this);
         putCompletionKeyValue(completionKey, readCompletion);
 
         writeAndFlush(channel, completionKey, request, allowFastFail, null, 
null);
@@ -1049,7 +1034,7 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
         final CompletionKey completionKey = new TxnCompletionKey(txnId, 
OperationType.GET_BOOKIE_INFO);
         completionObjects.put(completionKey,
                               new GetBookieInfoCompletion(
-                                      completionKey, cb, ctx));
+                                      completionKey, cb, ctx, this));
 
         // Build the request and calculate the total size to be included in 
the packet.
         BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
@@ -1413,7 +1398,7 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
         if (OperationType.BATCH_READ_ENTRY == operationType) {
             key = new TxnCompletionKey(((BookieProtocol.BatchedReadResponse) 
response).getRequestId(), operationType);
         } else {
-            key = acquireV2Key(response.ledgerId, response.entryId, 
operationType);
+            key = EntryCompletionKey.acquireV2Key(response.ledgerId, 
response.entryId, operationType);
         }
         CompletionValue completionValue = getCompletionValue(key);
         key.release();
@@ -1656,813 +1641,11 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
             });
     }
 
-    /**
-     * Boiler-plate wrapper classes follow.
-     *
-     */
-
-    // visible for testing
-    abstract class CompletionValue {
-        private final OpStatsLogger opLogger;
-        private final OpStatsLogger timeoutOpLogger;
-        private final String operationName;
-        private final Map<String, String> mdcContextMap;
-        protected Object ctx;
-        protected long ledgerId;
-        protected long entryId;
-        protected long startTime;
-
-        public CompletionValue(String operationName,
-                               Object ctx,
-                               long ledgerId, long entryId,
-                               OpStatsLogger opLogger,
-                               OpStatsLogger timeoutOpLogger) {
-            this.operationName = operationName;
-            this.ctx = ctx;
-            this.ledgerId = ledgerId;
-            this.entryId = entryId;
-            this.startTime = MathUtils.nowInNano();
-            this.opLogger = opLogger;
-            this.timeoutOpLogger = timeoutOpLogger;
-            this.mdcContextMap = preserveMdcForTaskExecution ? 
MDC.getCopyOfContextMap() : null;
-        }
-
-        private long latency() {
-            return MathUtils.elapsedNanos(startTime);
-        }
-
-        void logOpResult(int rc) {
-            if (rc != BKException.Code.OK) {
-                opLogger.registerFailedEvent(latency(), TimeUnit.NANOSECONDS);
-            } else {
-                opLogger.registerSuccessfulEvent(latency(), 
TimeUnit.NANOSECONDS);
-            }
-
-            if (rc != BKException.Code.OK
-                && !expectedBkOperationErrors.contains(rc)) {
-                recordError();
-            }
-        }
-
-        boolean maybeTimeout() {
-            if (MathUtils.elapsedNanos(startTime) >= readEntryTimeoutNanos) {
-                timeout();
-                return true;
-            } else {
-                return false;
-            }
-        }
-
-        void timeout() {
-            errorOut(BKException.Code.TimeoutException);
-            timeoutOpLogger.registerSuccessfulEvent(latency(),
-                                                    TimeUnit.NANOSECONDS);
-        }
-
-        protected void logResponse(StatusCode status, Object... extraInfo) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Got {} response from bookie:{} rc:{}, {}", 
operationName, bookieId, status,
-                        Joiner.on(":").join(extraInfo));
-            }
-        }
-
-        protected int convertStatus(StatusCode status, int defaultStatus) {
-            // convert to BKException code
-            int rcToRet = statusCodeToExceptionCode(status);
-            if (rcToRet == BKException.Code.UNINITIALIZED) {
-                LOG.error("{} for failed on bookie {} code {}",
-                          operationName, bookieId, status);
-                return defaultStatus;
-            } else {
-                return rcToRet;
-            }
-        }
-
-        public void restoreMdcContext() {
-            MdcUtils.restoreContext(mdcContextMap);
-        }
-
-        public abstract void errorOut();
-        public abstract void errorOut(int rc);
-        public void setOutstanding() {
-            // no-op
-        }
-
-        protected void errorOutAndRunCallback(final Runnable callback) {
-            executor.executeOrdered(ledgerId, () -> {
-                String bAddress = "null";
-                Channel c = channel;
-                if (c != null && c.remoteAddress() != null) {
-                    bAddress = c.remoteAddress().toString();
-                }
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Could not write {} request to bookie {} for 
ledger {}, entry {}",
-                            operationName, bAddress,
-                            ledgerId, entryId);
-                }
-                callback.run();
-            });
-        }
-
-        public void handleV2Response(
-                long ledgerId, long entryId, StatusCode status,
-                BookieProtocol.Response response) {
-            LOG.warn("Unhandled V2 response {}", response);
-        }
-
-        public abstract void handleV3Response(
-                BookkeeperProtocol.Response response);
-    }
-
-    // visible for testing
-    class WriteLacCompletion extends CompletionValue {
-        final WriteLacCallback cb;
-
-        public WriteLacCompletion(final CompletionKey key,
-                                  final WriteLacCallback originalCallback,
-                                  final Object originalCtx,
-                                  final long ledgerId) {
-            super("WriteLAC",
-                  originalCtx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED,
-                  writeLacOpLogger, writeLacTimeoutOpLogger);
-            this.cb = new WriteLacCallback() {
-                    @Override
-                    public void writeLacComplete(int rc, long ledgerId,
-                                                 BookieId addr,
-                                                 Object ctx) {
-                        logOpResult(rc);
-                        originalCallback.writeLacComplete(rc, ledgerId,
-                                                          addr, originalCtx);
-                        key.release();
-                    }
-                };
-        }
-
-        @Override
-        public void errorOut() {
-            errorOut(BKException.Code.BookieHandleNotAvailableException);
-        }
-
-        @Override
-        public void errorOut(final int rc) {
-            errorOutAndRunCallback(
-                    () -> cb.writeLacComplete(rc, ledgerId, bookieId, ctx));
-        }
-
-        @Override
-        public void handleV3Response(BookkeeperProtocol.Response response) {
-            WriteLacResponse writeLacResponse = response.getWriteLacResponse();
-            StatusCode status = response.getStatus() == StatusCode.EOK
-                ? writeLacResponse.getStatus() : response.getStatus();
-            long ledgerId = writeLacResponse.getLedgerId();
-
-            if (LOG.isDebugEnabled()) {
-                logResponse(status, "ledger", ledgerId);
-            }
-            int rc = convertStatus(status, BKException.Code.WriteException);
-            cb.writeLacComplete(rc, ledgerId, bookieId, ctx);
-        }
-    }
-
-    class ForceLedgerCompletion extends CompletionValue {
-        final ForceLedgerCallback cb;
-
-        public ForceLedgerCompletion(final CompletionKey key,
-                                  final ForceLedgerCallback originalCallback,
-                                  final Object originalCtx,
-                                  final long ledgerId) {
-            super("ForceLedger",
-                  originalCtx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED,
-                  forceLedgerOpLogger, forceLedgerTimeoutOpLogger);
-            this.cb = new ForceLedgerCallback() {
-                    @Override
-                    public void forceLedgerComplete(int rc, long ledgerId,
-                                                 BookieId addr,
-                                                 Object ctx) {
-                        logOpResult(rc);
-                        originalCallback.forceLedgerComplete(rc, ledgerId,
-                                                          addr, originalCtx);
-                        key.release();
-                    }
-                };
-        }
-
-        @Override
-        public void errorOut() {
-            errorOut(BKException.Code.BookieHandleNotAvailableException);
-        }
-
-        @Override
-        public void errorOut(final int rc) {
-            errorOutAndRunCallback(
-                    () -> cb.forceLedgerComplete(rc, ledgerId, bookieId, ctx));
-        }
-
-        @Override
-        public void handleV3Response(BookkeeperProtocol.Response response) {
-            ForceLedgerResponse forceLedgerResponse = 
response.getForceLedgerResponse();
-            StatusCode status = response.getStatus() == StatusCode.EOK
-                ? forceLedgerResponse.getStatus() : response.getStatus();
-            long ledgerId = forceLedgerResponse.getLedgerId();
-
-            if (LOG.isDebugEnabled()) {
-                logResponse(status, "ledger", ledgerId);
-            }
-            int rc = convertStatus(status, BKException.Code.WriteException);
-            cb.forceLedgerComplete(rc, ledgerId, bookieId, ctx);
-        }
-    }
-
-    // visible for testing
-    class ReadLacCompletion extends CompletionValue {
-        final ReadLacCallback cb;
-
-        public ReadLacCompletion(final CompletionKey key,
-                                 ReadLacCallback originalCallback,
-                                 final Object ctx, final long ledgerId) {
-            super("ReadLAC", ctx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED,
-                  readLacOpLogger, readLacTimeoutOpLogger);
-            this.cb = new ReadLacCallback() {
-                    @Override
-                    public void readLacComplete(int rc, long ledgerId,
-                                                ByteBuf lacBuffer,
-                                                ByteBuf lastEntryBuffer,
-                                                Object ctx) {
-                        logOpResult(rc);
-                        originalCallback.readLacComplete(
-                                rc, ledgerId, lacBuffer, lastEntryBuffer, ctx);
-                        key.release();
-                    }
-                };
-        }
-
-        @Override
-        public void errorOut() {
-            errorOut(BKException.Code.BookieHandleNotAvailableException);
-        }
-
-        @Override
-        public void errorOut(final int rc) {
-            errorOutAndRunCallback(
-                    () -> cb.readLacComplete(rc, ledgerId, null, null, ctx));
-        }
-
-        @Override
-        public void handleV3Response(BookkeeperProtocol.Response response) {
-            ReadLacResponse readLacResponse = response.getReadLacResponse();
-            ByteBuf lacBuffer = Unpooled.EMPTY_BUFFER;
-            ByteBuf lastEntryBuffer = Unpooled.EMPTY_BUFFER;
-            StatusCode status = response.getStatus() == StatusCode.EOK
-                ? readLacResponse.getStatus() : response.getStatus();
-
-            if (readLacResponse.hasLacBody()) {
-                lacBuffer = 
Unpooled.wrappedBuffer(readLacResponse.getLacBody().asReadOnlyByteBuffer());
-            }
-
-            if (readLacResponse.hasLastEntryBody()) {
-                lastEntryBuffer = 
Unpooled.wrappedBuffer(readLacResponse.getLastEntryBody().asReadOnlyByteBuffer());
-            }
-
-            if (LOG.isDebugEnabled()) {
-                logResponse(status, "ledgerId", ledgerId);
-            }
-
-            int rc = convertStatus(status, BKException.Code.ReadException);
-            cb.readLacComplete(rc, ledgerId, lacBuffer.slice(),
-                               lastEntryBuffer.slice(), ctx);
-        }
-    }
-
-    // visible for testing
-    class ReadCompletion extends CompletionValue {
-        final ReadEntryCallback cb;
-
-        public ReadCompletion(final CompletionKey key,
-                              final ReadEntryCallback originalCallback,
-                              final Object originalCtx,
-                              long ledgerId, final long entryId) {
-            super("Read", originalCtx, ledgerId, entryId,
-                  readEntryOpLogger, readTimeoutOpLogger);
-
-            this.cb = new ReadEntryCallback() {
-                    @Override
-                    public void readEntryComplete(int rc, long ledgerId,
-                                                  long entryId, ByteBuf buffer,
-                                                  Object ctx) {
-                        logOpResult(rc);
-                        originalCallback.readEntryComplete(rc,
-                                                           ledgerId, entryId,
-                                                           buffer, 
originalCtx);
-                        key.release();
-                    }
-                };
-        }
-
-        @Override
-        public void errorOut() {
-            errorOut(BKException.Code.BookieHandleNotAvailableException);
-        }
-
-        @Override
-        public void errorOut(final int rc) {
-            errorOutAndRunCallback(
-                    () -> cb.readEntryComplete(rc, ledgerId,
-                                               entryId, null, ctx));
-        }
-
-        @Override
-        public void setOutstanding() {
-            readEntryOutstanding.inc();
-        }
-
-        @Override
-        public void handleV2Response(long ledgerId, long entryId,
-                                     StatusCode status,
-                                     BookieProtocol.Response response) {
-            readEntryOutstanding.dec();
-            if (!(response instanceof BookieProtocol.ReadResponse)) {
-                return;
-            }
-            BookieProtocol.ReadResponse readResponse = 
(BookieProtocol.ReadResponse) response;
-            handleReadResponse(ledgerId, entryId, status, 
readResponse.getData(),
-                               INVALID_ENTRY_ID, -1L);
-        }
-
-        @Override
-        public void handleV3Response(BookkeeperProtocol.Response response) {
-            readEntryOutstanding.dec();
-            ReadResponse readResponse = response.getReadResponse();
-            StatusCode status = response.getStatus() == StatusCode.EOK
-                ? readResponse.getStatus() : response.getStatus();
-            ByteBuf buffer = Unpooled.EMPTY_BUFFER;
-            if (readResponse.hasBody()) {
-                buffer = 
Unpooled.wrappedBuffer(readResponse.getBody().asReadOnlyByteBuffer());
-            }
-            long maxLAC = INVALID_ENTRY_ID;
-            if (readResponse.hasMaxLAC()) {
-                maxLAC = readResponse.getMaxLAC();
-            }
-            long lacUpdateTimestamp = -1L;
-            if (readResponse.hasLacUpdateTimestamp()) {
-                lacUpdateTimestamp = readResponse.getLacUpdateTimestamp();
-            }
-            handleReadResponse(readResponse.getLedgerId(),
-                               readResponse.getEntryId(),
-                               status, buffer, maxLAC, lacUpdateTimestamp);
-            ReferenceCountUtil.release(
-                    buffer); // meaningless using unpooled, but client may 
expect to hold the last reference
-        }
-
-        private void handleReadResponse(long ledgerId,
-                                        long entryId,
-                                        StatusCode status,
-                                        ByteBuf buffer,
-                                        long maxLAC, // max known lac 
piggy-back from bookies
-                                        long lacUpdateTimestamp) { // the 
timestamp when the lac is updated.
-            int readableBytes = buffer.readableBytes();
-            if (LOG.isDebugEnabled()) {
-                logResponse(status, "ledger", ledgerId, "entry", entryId, 
"entryLength", readableBytes);
-            }
-
-            int rc = convertStatus(status, BKException.Code.ReadException);
-
-            if (maxLAC > INVALID_ENTRY_ID && (ctx instanceof 
ReadEntryCallbackCtx)) {
-                ((ReadEntryCallbackCtx) ctx).setLastAddConfirmed(maxLAC);
-            }
-            if (lacUpdateTimestamp > -1L && (ctx instanceof 
ReadLastConfirmedAndEntryContext)) {
-                ((ReadLastConfirmedAndEntryContext) 
ctx).setLacUpdateTimestamp(lacUpdateTimestamp);
-            }
-            cb.readEntryComplete(rc, ledgerId, entryId, buffer.slice(), ctx);
-        }
-    }
-
-    class BatchedReadCompletion extends CompletionValue {
-
-        final BatchedReadEntryCallback cb;
-
-        public BatchedReadCompletion(final CompletionKey key,
-                                     final BatchedReadEntryCallback 
originalCallback,
-                                     final Object originalCtx,
-                                     long ledgerId, final long entryId) {
-            super("BatchedRead", originalCtx, ledgerId, entryId,
-                    readEntryOpLogger, readTimeoutOpLogger);
-            this.cb = new BatchedReadEntryCallback() {
-
-                @Override
-                public void readEntriesComplete(int rc,
-                                                long ledgerId,
-                                                long startEntryId,
-                                                ByteBufList bufList,
-                                                Object ctx) {
-                    logOpResult(rc);
-                    originalCallback.readEntriesComplete(rc,
-                            ledgerId, entryId,
-                            bufList, originalCtx);
-                    key.release();
-                }
-            };
-        }
-
-        @Override
-        public void errorOut() {
-            errorOut(BKException.Code.BookieHandleNotAvailableException);
-        }
-
-        @Override
-        public void errorOut(final int rc) {
-            errorOutAndRunCallback(
-                    () -> cb.readEntriesComplete(rc, ledgerId,
-                            entryId, null, ctx));
-        }
-
-        @Override
-        public void handleV2Response(long ledgerId,
-                                     long entryId,
-                                     StatusCode status,
-                                     BookieProtocol.Response response) {
-
-            readEntryOutstanding.dec();
-            if (!(response instanceof BookieProtocol.BatchedReadResponse)) {
-                return;
-            }
-            BookieProtocol.BatchedReadResponse readResponse = 
(BookieProtocol.BatchedReadResponse) response;
-            handleBatchedReadResponse(ledgerId, entryId, status, 
readResponse.getData(),
-                    INVALID_ENTRY_ID, -1L);
-        }
-
-        @Override
-        public void handleV3Response(Response response) {
-            // V3 protocol haven't supported batched read yet.
-        }
-
-        private void handleBatchedReadResponse(long ledgerId,
-                                        long entryId,
-                                        StatusCode status,
-                                        ByteBufList buffers,
-                                        long maxLAC, // max known lac 
piggy-back from bookies
-                                        long lacUpdateTimestamp) { // the 
timestamp when the lac is updated.
-            int rc = convertStatus(status, BKException.Code.ReadException);
-
-            if (maxLAC > INVALID_ENTRY_ID && (ctx instanceof 
ReadEntryCallbackCtx)) {
-                ((ReadEntryCallbackCtx) ctx).setLastAddConfirmed(maxLAC);
-            }
-            if (lacUpdateTimestamp > -1L && (ctx instanceof 
ReadLastConfirmedAndEntryContext)) {
-                ((ReadLastConfirmedAndEntryContext) 
ctx).setLacUpdateTimestamp(lacUpdateTimestamp);
-            }
-            cb.readEntriesComplete(rc, ledgerId, entryId, buffers, ctx);
-        }
-    }
-
-    class StartTLSCompletion extends CompletionValue {
-        final StartTLSCallback cb;
-
-        public StartTLSCompletion(final CompletionKey key) {
-            super("StartTLS", null, -1, -1,
-                  startTLSOpLogger, startTLSTimeoutOpLogger);
-            this.cb = new StartTLSCallback() {
-                @Override
-                public void startTLSComplete(int rc, Object ctx) {
-                    logOpResult(rc);
-                    key.release();
-                }
-            };
-        }
-
-        @Override
-        public void errorOut() {
-            errorOut(BKException.Code.BookieHandleNotAvailableException);
-        }
-
-        @Override
-        public void errorOut(final int rc) {
-            failTLS(rc);
-        }
-
-        @Override
-        public void handleV3Response(BookkeeperProtocol.Response response) {
-            StatusCode status = response.getStatus();
-
-            if (LOG.isDebugEnabled()) {
-                logResponse(status);
-            }
-
-            int rc = convertStatus(status, BKException.Code.SecurityException);
-
-            // Cancel START_TLS request timeout
-            cb.startTLSComplete(rc, null);
-
-            if (state != ConnectionState.START_TLS) {
-                LOG.error("Connection state changed before TLS response 
received");
-                failTLS(BKException.Code.BookieHandleNotAvailableException);
-            } else if (status != StatusCode.EOK) {
-                LOG.error("Client received error {} during TLS negotiation", 
status);
-                failTLS(BKException.Code.SecurityException);
-            } else {
-                initTLSHandshake();
-            }
-        }
-
-    }
-
-    // visible for testing
-    class GetBookieInfoCompletion extends CompletionValue {
-        final GetBookieInfoCallback cb;
-
-        public GetBookieInfoCompletion(final CompletionKey key,
-                                       final GetBookieInfoCallback 
origCallback,
-                                       final Object origCtx) {
-            super("GetBookieInfo", origCtx, 0L, 0L,
-                  getBookieInfoOpLogger, getBookieInfoTimeoutOpLogger);
-            this.cb = new GetBookieInfoCallback() {
-                @Override
-                public void getBookieInfoComplete(int rc, BookieInfo bInfo,
-                                                  Object ctx) {
-                    logOpResult(rc);
-                    origCallback.getBookieInfoComplete(rc, bInfo, origCtx);
-                    key.release();
-                }
-            };
-        }
-
-        @Override
-        boolean maybeTimeout() {
-            if (MathUtils.elapsedNanos(startTime) >= 
getBookieInfoTimeoutNanos) {
-                timeout();
-                return true;
-            } else {
-                return false;
-            }
-        }
-
-        @Override
-        public void errorOut() {
-            errorOut(BKException.Code.BookieHandleNotAvailableException);
-        }
-
-        @Override
-        public void errorOut(final int rc) {
-            errorOutAndRunCallback(
-                    () -> cb.getBookieInfoComplete(rc, new BookieInfo(), ctx));
-        }
-
-        @Override
-        public void handleV3Response(BookkeeperProtocol.Response response) {
-            GetBookieInfoResponse getBookieInfoResponse = 
response.getGetBookieInfoResponse();
-            StatusCode status = response.getStatus() == StatusCode.EOK
-                ? getBookieInfoResponse.getStatus() : response.getStatus();
-
-            long freeDiskSpace = getBookieInfoResponse.getFreeDiskSpace();
-            long totalDiskSpace = getBookieInfoResponse.getTotalDiskCapacity();
-
-            if (LOG.isDebugEnabled()) {
-                logResponse(status, "freeDisk", freeDiskSpace, "totalDisk", 
totalDiskSpace);
-            }
-
-            int rc = convertStatus(status, BKException.Code.ReadException);
-            cb.getBookieInfoComplete(rc,
-                                     new BookieInfo(totalDiskSpace,
-                                                    freeDiskSpace), ctx);
-        }
-    }
-
-    class GetListOfEntriesOfLedgerCompletion extends CompletionValue {
-        final GetListOfEntriesOfLedgerCallback cb;
-
-        public GetListOfEntriesOfLedgerCompletion(final CompletionKey key,
-                final GetListOfEntriesOfLedgerCallback origCallback, final 
long ledgerId) {
-            super("GetListOfEntriesOfLedger", null, ledgerId, 0L, 
getListOfEntriesOfLedgerCompletionOpLogger,
-                    getListOfEntriesOfLedgerCompletionTimeoutOpLogger);
-            this.cb = new GetListOfEntriesOfLedgerCallback() {
-                @Override
-                public void getListOfEntriesOfLedgerComplete(int rc, long 
ledgerId,
-                        AvailabilityOfEntriesOfLedger 
availabilityOfEntriesOfLedger) {
-                    logOpResult(rc);
-                    origCallback.getListOfEntriesOfLedgerComplete(rc, 
ledgerId, availabilityOfEntriesOfLedger);
-                    key.release();
-                }
-            };
-        }
-
-        @Override
-        public void errorOut() {
-            errorOut(BKException.Code.BookieHandleNotAvailableException);
-        }
-
-        @Override
-        public void errorOut(final int rc) {
-            errorOutAndRunCallback(() -> 
cb.getListOfEntriesOfLedgerComplete(rc, ledgerId, null));
-        }
-
-        @Override
-        public void handleV3Response(BookkeeperProtocol.Response response) {
-            GetListOfEntriesOfLedgerResponse getListOfEntriesOfLedgerResponse 
= response
-                    .getGetListOfEntriesOfLedgerResponse();
-            ByteBuf availabilityOfEntriesOfLedgerBuffer = 
Unpooled.EMPTY_BUFFER;
-            StatusCode status = response.getStatus() == StatusCode.EOK ? 
getListOfEntriesOfLedgerResponse.getStatus()
-                    : response.getStatus();
-
-            if 
(getListOfEntriesOfLedgerResponse.hasAvailabilityOfEntriesOfLedger()) {
-                availabilityOfEntriesOfLedgerBuffer = Unpooled.wrappedBuffer(
-                        
getListOfEntriesOfLedgerResponse.getAvailabilityOfEntriesOfLedger().asReadOnlyByteBuffer());
-            }
-
-            if (LOG.isDebugEnabled()) {
-                logResponse(status, "ledgerId", ledgerId);
-            }
-
-            int rc = convertStatus(status, BKException.Code.ReadException);
-            AvailabilityOfEntriesOfLedger availabilityOfEntriesOfLedger = null;
-            if (rc == BKException.Code.OK) {
-                availabilityOfEntriesOfLedger = new 
AvailabilityOfEntriesOfLedger(
-                        availabilityOfEntriesOfLedgerBuffer.slice());
-            }
-            cb.getListOfEntriesOfLedgerComplete(rc, ledgerId, 
availabilityOfEntriesOfLedger);
-        }
-    }
-
-    private final Recycler<AddCompletion> addCompletionRecycler = new 
Recycler<AddCompletion>() {
-            @Override
-            protected AddCompletion newObject(Recycler.Handle<AddCompletion> 
handle) {
-                return new AddCompletion(handle);
-            }
-        };
-
-    AddCompletion acquireAddCompletion(final CompletionKey key,
-                                       final WriteCallback originalCallback,
-                                       final Object originalCtx,
-                                       final long ledgerId, final long 
entryId) {
-        AddCompletion completion = addCompletionRecycler.get();
-        completion.reset(key, originalCallback, originalCtx, ledgerId, 
entryId);
-        return completion;
-    }
-
-    // visible for testing
-    class AddCompletion extends CompletionValue implements WriteCallback {
-        final Recycler.Handle<AddCompletion> handle;
-
-        CompletionKey key = null;
-        WriteCallback originalCallback = null;
-
-        AddCompletion(Recycler.Handle<AddCompletion> handle) {
-            super("Add", null, -1, -1, addEntryOpLogger, addTimeoutOpLogger);
-            this.handle = handle;
-        }
-
-        void reset(final CompletionKey key,
-                   final WriteCallback originalCallback,
-                   final Object originalCtx,
-                   final long ledgerId, final long entryId) {
-            this.key = key;
-            this.originalCallback = originalCallback;
-            this.ctx = originalCtx;
-            this.ledgerId = ledgerId;
-            this.entryId = entryId;
-            this.startTime = MathUtils.nowInNano();
-        }
-
-        @Override
-        public void writeComplete(int rc, long ledgerId, long entryId,
-                                  BookieId addr,
-                                  Object ctx) {
-            logOpResult(rc);
-            originalCallback.writeComplete(rc, ledgerId, entryId, addr, ctx);
-            key.release();
-            handle.recycle(this);
-        }
-
-        @Override
-        boolean maybeTimeout() {
-            if (MathUtils.elapsedNanos(startTime) >= addEntryTimeoutNanos) {
-                timeout();
-                return true;
-            } else {
-                return false;
-            }
-        }
-
-        @Override
-        public void errorOut() {
-            errorOut(BKException.Code.BookieHandleNotAvailableException);
-        }
-
-        @Override
-        public void errorOut(final int rc) {
-            errorOutAndRunCallback(
-                    () -> writeComplete(rc, ledgerId, entryId, bookieId, ctx));
-        }
-
-        @Override
-        public void setOutstanding() {
-            addEntryOutstanding.inc();
-        }
-
-        @Override
-        public void handleV2Response(
-                long ledgerId, long entryId, StatusCode status,
-                BookieProtocol.Response response) {
-            addEntryOutstanding.dec();
-            handleResponse(ledgerId, entryId, status);
-        }
-
-        @Override
-        public void handleV3Response(
-                BookkeeperProtocol.Response response) {
-            addEntryOutstanding.dec();
-            AddResponse addResponse = response.getAddResponse();
-            StatusCode status = response.getStatus() == StatusCode.EOK
-                ? addResponse.getStatus() : response.getStatus();
-            handleResponse(addResponse.getLedgerId(), addResponse.getEntryId(),
-                           status);
-        }
-
-        private void handleResponse(long ledgerId, long entryId,
-                                    StatusCode status) {
-            if (LOG.isDebugEnabled()) {
-                logResponse(status, "ledger", ledgerId, "entry", entryId);
-            }
-
-            int rc = convertStatus(status, BKException.Code.WriteException);
-            writeComplete(rc, ledgerId, entryId, bookieId, ctx);
-        }
-    }
-
     // visible for testing
     CompletionKey newCompletionKey(long txnId, OperationType operationType) {
         return new TxnCompletionKey(txnId, operationType);
     }
 
-    class TxnCompletionKey extends CompletionKey {
-        final long txnId;
-
-        public TxnCompletionKey(long txnId, OperationType operationType) {
-            super(operationType);
-            this.txnId = txnId;
-        }
-
-        @Override
-        public boolean equals(Object obj) {
-            if (!(obj instanceof TxnCompletionKey)) {
-                return false;
-            }
-            TxnCompletionKey that = (TxnCompletionKey) obj;
-            return this.txnId == that.txnId && this.operationType == 
that.operationType;
-        }
-
-        @Override
-        public int hashCode() {
-            return ((int) txnId);
-        }
-
-        @Override
-        public String toString() {
-            return String.format("TxnId(%d), OperationType(%s)", txnId, 
operationType);
-        }
-
-    }
-
-    abstract class CompletionKey {
-        OperationType operationType;
-
-        CompletionKey(OperationType operationType) {
-            this.operationType = operationType;
-        }
-
-        public void release() {}
-    }
-
-    /**
-     * Note : Helper functions follow
-     */
-
-    /**
-     * @param status
-     * @return {@link BKException.Code.UNINITIALIZED} if the statuscode is 
unknown.
-     */
-    private int statusCodeToExceptionCode(StatusCode status) {
-        switch (status) {
-            case EOK:
-                return BKException.Code.OK;
-            case ENOENTRY:
-                return BKException.Code.NoSuchEntryException;
-            case ENOLEDGER:
-                return BKException.Code.NoSuchLedgerExistsException;
-            case EBADVERSION:
-                return BKException.Code.ProtocolVersionException;
-            case EUA:
-                return BKException.Code.UnauthorizedAccessException;
-            case EFENCED:
-                return BKException.Code.LedgerFencedException;
-            case EREADONLY:
-                return BKException.Code.WriteOnReadOnlyBookieException;
-            case ETOOMANYREQUESTS:
-                return BKException.Code.TooManyRequestsException;
-            case EUNKNOWNLEDGERSTATE:
-                return BKException.Code.DataUnknownException;
-            default:
-                return BKException.Code.UNINITIALIZED;
-        }
-    }
-
     private void putCompletionKeyValue(CompletionKey key, CompletionValue 
value) {
         CompletionValue existingValue = completionObjects.putIfAbsent(key, 
value);
         if (existingValue != null) { // will only happen for V2 keys, as V3 
have unique txnid
@@ -2484,64 +1667,6 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
         return txnIdGenerator.incrementAndGet();
     }
 
-    private final Recycler<EntryCompletionKey> v2KeyRecycler = new 
Recycler<EntryCompletionKey>() {
-            @Override
-            protected EntryCompletionKey newObject(
-                    Recycler.Handle<EntryCompletionKey> handle) {
-                return new EntryCompletionKey(handle);
-            }
-        };
-
-    EntryCompletionKey acquireV2Key(long ledgerId, long entryId,
-                             OperationType operationType) {
-        EntryCompletionKey key = v2KeyRecycler.get();
-        key.reset(ledgerId, entryId, operationType);
-        return key;
-    }
-
-    private class EntryCompletionKey extends CompletionKey {
-        private final Handle<EntryCompletionKey> recyclerHandle;
-        long ledgerId;
-        long entryId;
-
-        private EntryCompletionKey(Handle<EntryCompletionKey> handle) {
-            super(null);
-            this.recyclerHandle = handle;
-        }
-
-        void reset(long ledgerId, long entryId, OperationType operationType) {
-            this.ledgerId = ledgerId;
-            this.entryId = entryId;
-            this.operationType = operationType;
-        }
-
-        @Override
-        public boolean equals(Object object) {
-            if (!(object instanceof EntryCompletionKey)) {
-                return  false;
-            }
-            EntryCompletionKey that = (EntryCompletionKey) object;
-            return this.entryId == that.entryId
-                && this.ledgerId == that.ledgerId
-                && this.operationType == that.operationType;
-        }
-
-        @Override
-        public int hashCode() {
-            return Long.hashCode(ledgerId) * 31 + Long.hashCode(entryId);
-        }
-
-        @Override
-        public String toString() {
-            return String.format("%d:%d %s", ledgerId, entryId, operationType);
-        }
-
-        @Override
-        public void release() {
-            recyclerHandle.recycle(this);
-        }
-    }
-
     Request.Builder withRequestContext(Request.Builder builder) {
         if (preserveMdcForTaskExecution) {
             return appendRequestContext(builder);
@@ -2710,7 +1835,7 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
         final long txnId = getTxnId();
         final CompletionKey completionKey = new TxnCompletionKey(txnId, 
OperationType.START_TLS);
         completionObjects.put(completionKey,
-                              new StartTLSCompletion(completionKey));
+                              new StartTLSCompletion(completionKey, this));
         BookkeeperProtocol.Request.Builder h = 
withRequestContext(BookkeeperProtocol.Request.newBuilder());
         BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
                 .setVersion(ProtocolVersion.VERSION_THREE)
@@ -2722,7 +1847,7 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
         writeAndFlush(channel, completionKey, h.build());
     }
 
-    private void failTLS(int rc) {
+    protected void failTLS(int rc) {
         LOG.error("TLS failure on: {}, rc: {}", channel, rc);
         Queue<GenericCallback<PerChannelBookieClient>> oldPendingOps;
         synchronized (this) {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadCompletion.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadCompletion.java
new file mode 100644
index 0000000000..d9639397b1
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadCompletion.java
@@ -0,0 +1,127 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.proto;
+
+import static org.apache.bookkeeper.client.LedgerHandle.INVALID_ENTRY_ID;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.util.ReferenceCountUtil;
+import org.apache.bookkeeper.client.BKException;
+
+class ReadCompletion extends CompletionValue {
+    final BookkeeperInternalCallbacks.ReadEntryCallback cb;
+
+    public ReadCompletion(final CompletionKey key,
+                          final BookkeeperInternalCallbacks.ReadEntryCallback 
originalCallback,
+                          final Object originalCtx,
+                          long ledgerId, final long entryId,
+                          PerChannelBookieClient perChannelBookieClient) {
+        super("Read", originalCtx, ledgerId, entryId, perChannelBookieClient);
+        this.opLogger = perChannelBookieClient.readEntryOpLogger;
+        this.timeoutOpLogger = perChannelBookieClient.readTimeoutOpLogger;
+        this.cb = (rc, ledgerId1, entryId1, buffer, ctx) -> {
+            logOpResult(rc);
+            originalCallback.readEntryComplete(rc,
+                    ledgerId1, entryId1,
+                    buffer, originalCtx);
+            key.release();
+        };
+    }
+
+    @Override
+    public void errorOut() {
+        errorOut(BKException.Code.BookieHandleNotAvailableException);
+    }
+
+    @Override
+    public void errorOut(final int rc) {
+        errorOutAndRunCallback(
+                () -> cb.readEntryComplete(rc, ledgerId,
+                        entryId, null, ctx));
+    }
+
+    @Override
+    public void setOutstanding() {
+        perChannelBookieClient.readEntryOutstanding.inc();
+    }
+
+    @Override
+    public void handleV2Response(long ledgerId, long entryId,
+                                 BookkeeperProtocol.StatusCode status,
+                                 BookieProtocol.Response response) {
+        perChannelBookieClient.readEntryOutstanding.dec();
+        if (!(response instanceof BookieProtocol.ReadResponse)) {
+            return;
+        }
+        BookieProtocol.ReadResponse readResponse = 
(BookieProtocol.ReadResponse) response;
+        handleReadResponse(ledgerId, entryId, status, readResponse.getData(),
+                INVALID_ENTRY_ID, -1L);
+    }
+
+    @Override
+    public void handleV3Response(BookkeeperProtocol.Response response) {
+        perChannelBookieClient.readEntryOutstanding.dec();
+        BookkeeperProtocol.ReadResponse readResponse = 
response.getReadResponse();
+        BookkeeperProtocol.StatusCode status = response.getStatus() == 
BookkeeperProtocol.StatusCode.EOK
+                ? readResponse.getStatus() : response.getStatus();
+        ByteBuf buffer = Unpooled.EMPTY_BUFFER;
+        if (readResponse.hasBody()) {
+            buffer = 
Unpooled.wrappedBuffer(readResponse.getBody().asReadOnlyByteBuffer());
+        }
+        long maxLAC = INVALID_ENTRY_ID;
+        if (readResponse.hasMaxLAC()) {
+            maxLAC = readResponse.getMaxLAC();
+        }
+        long lacUpdateTimestamp = -1L;
+        if (readResponse.hasLacUpdateTimestamp()) {
+            lacUpdateTimestamp = readResponse.getLacUpdateTimestamp();
+        }
+        handleReadResponse(readResponse.getLedgerId(),
+                readResponse.getEntryId(),
+                status, buffer, maxLAC, lacUpdateTimestamp);
+        ReferenceCountUtil.release(
+                buffer); // meaningless using unpooled, but client may expect 
to hold the last reference
+    }
+
+    private void handleReadResponse(long ledgerId,
+                                    long entryId,
+                                    BookkeeperProtocol.StatusCode status,
+                                    ByteBuf buffer,
+                                    long maxLAC, // max known lac piggy-back 
from bookies
+                                    long lacUpdateTimestamp) { // the 
timestamp when the lac is updated.
+        int readableBytes = buffer.readableBytes();
+        if (LOG.isDebugEnabled()) {
+            logResponse(status, "ledger", ledgerId, "entry", entryId, 
"entryLength", readableBytes);
+        }
+
+        int rc = convertStatus(status, BKException.Code.ReadException);
+
+        if (maxLAC > INVALID_ENTRY_ID && (ctx instanceof 
BookkeeperInternalCallbacks.ReadEntryCallbackCtx)) {
+            ((BookkeeperInternalCallbacks.ReadEntryCallbackCtx) 
ctx).setLastAddConfirmed(maxLAC);
+        }
+        if (lacUpdateTimestamp > -1L && (ctx instanceof 
ReadLastConfirmedAndEntryContext)) {
+            ((ReadLastConfirmedAndEntryContext) 
ctx).setLacUpdateTimestamp(lacUpdateTimestamp);
+        }
+        cb.readEntryComplete(rc, ledgerId, entryId, buffer.slice(), ctx);
+    }
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacCompletion.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacCompletion.java
new file mode 100644
index 0000000000..a5eadbf49a
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacCompletion.java
@@ -0,0 +1,87 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.proto;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.bookkeeper.client.BKException;
+
+class ReadLacCompletion extends CompletionValue {
+    final BookkeeperInternalCallbacks.ReadLacCallback cb;
+
+    public ReadLacCompletion(final CompletionKey key,
+                             BookkeeperInternalCallbacks.ReadLacCallback 
originalCallback,
+                             final Object ctx, final long ledgerId,
+                             PerChannelBookieClient perChannelBookieClient) {
+        super("ReadLAC", ctx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, 
perChannelBookieClient);
+        this.opLogger = perChannelBookieClient.readLacOpLogger;
+        this.timeoutOpLogger = perChannelBookieClient.readLacTimeoutOpLogger;
+        this.cb = new BookkeeperInternalCallbacks.ReadLacCallback() {
+            @Override
+            public void readLacComplete(int rc, long ledgerId,
+                                        ByteBuf lacBuffer,
+                                        ByteBuf lastEntryBuffer,
+                                        Object ctx) {
+                logOpResult(rc);
+                originalCallback.readLacComplete(
+                        rc, ledgerId, lacBuffer, lastEntryBuffer, ctx);
+                key.release();
+            }
+        };
+    }
+
+    @Override
+    public void errorOut() {
+        errorOut(BKException.Code.BookieHandleNotAvailableException);
+    }
+
+    @Override
+    public void errorOut(final int rc) {
+        errorOutAndRunCallback(
+                () -> cb.readLacComplete(rc, ledgerId, null, null, ctx));
+    }
+
+    @Override
+    public void handleV3Response(BookkeeperProtocol.Response response) {
+        BookkeeperProtocol.ReadLacResponse readLacResponse = 
response.getReadLacResponse();
+        ByteBuf lacBuffer = Unpooled.EMPTY_BUFFER;
+        ByteBuf lastEntryBuffer = Unpooled.EMPTY_BUFFER;
+        BookkeeperProtocol.StatusCode status = response.getStatus() == 
BookkeeperProtocol.StatusCode.EOK
+                ? readLacResponse.getStatus() : response.getStatus();
+
+        if (readLacResponse.hasLacBody()) {
+            lacBuffer = 
Unpooled.wrappedBuffer(readLacResponse.getLacBody().asReadOnlyByteBuffer());
+        }
+
+        if (readLacResponse.hasLastEntryBody()) {
+            lastEntryBuffer = 
Unpooled.wrappedBuffer(readLacResponse.getLastEntryBody().asReadOnlyByteBuffer());
+        }
+
+        if (LOG.isDebugEnabled()) {
+            logResponse(status, "ledgerId", ledgerId);
+        }
+
+        int rc = convertStatus(status, BKException.Code.ReadException);
+        cb.readLacComplete(rc, ledgerId, lacBuffer.slice(),
+                lastEntryBuffer.slice(), ctx);
+    }
+}
\ No newline at end of file
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/StartTLSCompletion.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/StartTLSCompletion.java
new file mode 100644
index 0000000000..71491ed467
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/StartTLSCompletion.java
@@ -0,0 +1,75 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.proto;
+
+import org.apache.bookkeeper.client.BKException;
+
+class StartTLSCompletion extends CompletionValue {
+    final BookkeeperInternalCallbacks.StartTLSCallback cb;
+
+    public StartTLSCompletion(final CompletionKey key, PerChannelBookieClient 
perChannelBookieClient) {
+        super("StartTLS", null, -1, -1, perChannelBookieClient);
+        this.opLogger = perChannelBookieClient.startTLSOpLogger;
+        this.timeoutOpLogger = perChannelBookieClient.startTLSTimeoutOpLogger;
+        this.cb = new BookkeeperInternalCallbacks.StartTLSCallback() {
+            @Override
+            public void startTLSComplete(int rc, Object ctx) {
+                logOpResult(rc);
+                key.release();
+            }
+        };
+    }
+
+    @Override
+    public void errorOut() {
+        errorOut(BKException.Code.BookieHandleNotAvailableException);
+    }
+
+    @Override
+    public void errorOut(final int rc) {
+        perChannelBookieClient.failTLS(rc);
+    }
+
+    @Override
+    public void handleV3Response(BookkeeperProtocol.Response response) {
+        BookkeeperProtocol.StatusCode status = response.getStatus();
+
+        if (LOG.isDebugEnabled()) {
+            logResponse(status);
+        }
+
+        int rc = convertStatus(status, BKException.Code.SecurityException);
+
+        // Cancel START_TLS request timeout
+        cb.startTLSComplete(rc, null);
+
+        if (perChannelBookieClient.state != 
PerChannelBookieClient.ConnectionState.START_TLS) {
+            LOG.error("Connection state changed before TLS response received");
+            
perChannelBookieClient.failTLS(BKException.Code.BookieHandleNotAvailableException);
+        } else if (status != BookkeeperProtocol.StatusCode.EOK) {
+            LOG.error("Client received error {} during TLS negotiation", 
status);
+            perChannelBookieClient.failTLS(BKException.Code.SecurityException);
+        } else {
+            perChannelBookieClient.initTLSHandshake();
+        }
+    }
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/TxnCompletionKey.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/TxnCompletionKey.java
new file mode 100644
index 0000000000..bbf64681e1
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/TxnCompletionKey.java
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.proto;
+
+class TxnCompletionKey extends CompletionKey {
+    final long txnId;
+
+    public TxnCompletionKey(long txnId, BookkeeperProtocol.OperationType 
operationType) {
+        super(operationType);
+        this.txnId = txnId;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (!(obj instanceof TxnCompletionKey)) {
+            return false;
+        }
+        TxnCompletionKey that = (TxnCompletionKey) obj;
+        return this.txnId == that.txnId && this.operationType == 
that.operationType;
+    }
+
+    @Override
+    public int hashCode() {
+        return ((int) txnId);
+    }
+
+    @Override
+    public String toString() {
+        return String.format("TxnId(%d), OperationType(%s)", txnId, 
operationType);
+    }
+
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacCompletion.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacCompletion.java
new file mode 100644
index 0000000000..9d4ca0c39a
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacCompletion.java
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.proto;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.net.BookieId;
+
+class WriteLacCompletion extends CompletionValue {
+    final BookkeeperInternalCallbacks.WriteLacCallback cb;
+
+    public WriteLacCompletion(final CompletionKey key,
+                              final 
BookkeeperInternalCallbacks.WriteLacCallback originalCallback,
+                              final Object originalCtx,
+                              final long ledgerId,
+                              PerChannelBookieClient perChannelBookieClient) {
+        super("WriteLAC",
+                originalCtx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, 
perChannelBookieClient);
+        this.opLogger = perChannelBookieClient.writeLacOpLogger;
+        this.timeoutOpLogger = perChannelBookieClient.writeLacTimeoutOpLogger;
+        this.cb = new BookkeeperInternalCallbacks.WriteLacCallback() {
+            @Override
+            public void writeLacComplete(int rc, long ledgerId,
+                                         BookieId addr,
+                                         Object ctx) {
+                logOpResult(rc);
+                originalCallback.writeLacComplete(rc, ledgerId,
+                        addr, originalCtx);
+                key.release();
+            }
+        };
+    }
+
+    @Override
+    public void errorOut() {
+        errorOut(BKException.Code.BookieHandleNotAvailableException);
+    }
+
+    @Override
+    public void errorOut(final int rc) {
+        errorOutAndRunCallback(
+                () -> cb.writeLacComplete(rc, ledgerId, 
perChannelBookieClient.bookieId, ctx));
+    }
+
+    @Override
+    public void handleV3Response(BookkeeperProtocol.Response response) {
+        BookkeeperProtocol.WriteLacResponse writeLacResponse = 
response.getWriteLacResponse();
+        BookkeeperProtocol.StatusCode status = response.getStatus() == 
BookkeeperProtocol.StatusCode.EOK
+                ? writeLacResponse.getStatus() : response.getStatus();
+        long ledgerId = writeLacResponse.getLedgerId();
+
+        if (LOG.isDebugEnabled()) {
+            logResponse(status, "ledger", ledgerId);
+        }
+        int rc = convertStatus(status, BKException.Code.WriteException);
+        cb.writeLacComplete(rc, ledgerId, perChannelBookieClient.bookieId, 
ctx);
+    }
+}

Reply via email to