This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-4.17
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/branch-4.17 by this push:
new 137cc946d9 Fix Memory Leak In Netty Recycler of Bookie Client (#4609)
137cc946d9 is described below
commit 137cc946d908f2e388c2b320001de3290f67ef60
Author: ken <[email protected]>
AuthorDate: Tue Jun 3 05:12:51 2025 +0800
Fix Memory Leak In Netty Recycler of Bookie Client (#4609)
* make recycler static to avoid OOM problem
* Fixed missing license headers
* Fixed checkstyle
* More checkstyle
---------
Co-authored-by: fanjianye <[email protected]>
Co-authored-by: Matteo Merli <[email protected]>
---
.../org/apache/bookkeeper/proto/AddCompletion.java | 151 ++++
.../bookkeeper/proto/BatchedReadCompletion.java | 98 +++
.../org/apache/bookkeeper/proto/CompletionKey.java | 34 +
.../apache/bookkeeper/proto/CompletionValue.java | 179 ++++
.../bookkeeper/proto/EntryCompletionKey.java | 82 ++
.../bookkeeper/proto/ForceLedgerCompletion.java | 70 ++
.../bookkeeper/proto/GetBookieInfoCompletion.java | 84 ++
.../proto/GetListOfEntriesOfLedgerCompletion.java | 84 ++
.../bookkeeper/proto/PerChannelBookieClient.java | 949 +--------------------
.../apache/bookkeeper/proto/ReadCompletion.java | 127 +++
.../apache/bookkeeper/proto/ReadLacCompletion.java | 87 ++
.../bookkeeper/proto/StartTLSCompletion.java | 75 ++
.../apache/bookkeeper/proto/TxnCompletionKey.java | 51 ++
.../bookkeeper/proto/WriteLacCompletion.java | 76 ++
14 files changed, 1235 insertions(+), 912 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AddCompletion.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AddCompletion.java
new file mode 100644
index 0000000000..c21226dd36
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AddCompletion.java
@@ -0,0 +1,151 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.proto;
+
+import io.netty.util.Recycler;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.common.util.MathUtils;
+import org.apache.bookkeeper.net.BookieId;
+import org.slf4j.MDC;
+
+class AddCompletion extends CompletionValue implements
BookkeeperInternalCallbacks.WriteCallback {
+
+ static AddCompletion acquireAddCompletion(final CompletionKey key,
+ final
BookkeeperInternalCallbacks.WriteCallback originalCallback,
+ final Object originalCtx,
+ final long ledgerId, final long
entryId,
+ PerChannelBookieClient
perChannelBookieClient) {
+ AddCompletion completion = ADD_COMPLETION_RECYCLER.get();
+ completion.reset(key, originalCallback, originalCtx, ledgerId,
entryId, perChannelBookieClient);
+ return completion;
+ }
+
+ final Recycler.Handle<AddCompletion> handle;
+
+ CompletionKey key = null;
+ BookkeeperInternalCallbacks.WriteCallback originalCallback = null;
+
+ AddCompletion(Recycler.Handle<AddCompletion> handle) {
+ super("Add", null, -1, -1, null);
+ this.handle = handle;
+ }
+
+ void reset(final CompletionKey key,
+ final BookkeeperInternalCallbacks.WriteCallback
originalCallback,
+ final Object originalCtx,
+ final long ledgerId, final long entryId,
+ PerChannelBookieClient perChannelBookieClient) {
+ this.key = key;
+ this.originalCallback = originalCallback;
+ this.ctx = originalCtx;
+ this.ledgerId = ledgerId;
+ this.entryId = entryId;
+ this.startTime =
org.apache.bookkeeper.common.util.MathUtils.nowInNano();
+
+ this.opLogger = perChannelBookieClient.addEntryOpLogger;
+ this.timeoutOpLogger = perChannelBookieClient.addTimeoutOpLogger;
+ this.perChannelBookieClient = perChannelBookieClient;
+ this.mdcContextMap =
perChannelBookieClient.preserveMdcForTaskExecution ? MDC.getCopyOfContextMap()
: null;
+ }
+
+ @Override
+ public void release() {
+ this.ctx = null;
+ this.opLogger = null;
+ this.timeoutOpLogger = null;
+ this.perChannelBookieClient = null;
+ this.mdcContextMap = null;
+ handle.recycle(this);
+ }
+
+ @Override
+ public void writeComplete(int rc, long ledgerId, long entryId,
+ BookieId addr,
+ Object ctx) {
+ logOpResult(rc);
+ originalCallback.writeComplete(rc, ledgerId, entryId, addr, ctx);
+ key.release();
+ this.release();
+ }
+
+ @Override
+ boolean maybeTimeout() {
+ if (MathUtils.elapsedNanos(startTime) >=
perChannelBookieClient.addEntryTimeoutNanos) {
+ timeout();
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public void errorOut() {
+ errorOut(BKException.Code.BookieHandleNotAvailableException);
+ }
+
+ @Override
+ public void errorOut(final int rc) {
+ errorOutAndRunCallback(
+ () -> writeComplete(rc, ledgerId, entryId,
perChannelBookieClient.bookieId, ctx));
+ }
+
+ @Override
+ public void setOutstanding() {
+ perChannelBookieClient.addEntryOutstanding.inc();
+ }
+
+ @Override
+ public void handleV2Response(
+ long ledgerId, long entryId, BookkeeperProtocol.StatusCode status,
+ BookieProtocol.Response response) {
+ perChannelBookieClient.addEntryOutstanding.dec();
+ handleResponse(ledgerId, entryId, status);
+ }
+
+ @Override
+ public void handleV3Response(
+ BookkeeperProtocol.Response response) {
+ perChannelBookieClient.addEntryOutstanding.dec();
+ BookkeeperProtocol.AddResponse addResponse = response.getAddResponse();
+ BookkeeperProtocol.StatusCode status = response.getStatus() ==
BookkeeperProtocol.StatusCode.EOK
+ ? addResponse.getStatus() : response.getStatus();
+ handleResponse(addResponse.getLedgerId(), addResponse.getEntryId(),
+ status);
+ }
+
+ private void handleResponse(long ledgerId, long entryId,
+ BookkeeperProtocol.StatusCode status) {
+ if (LOG.isDebugEnabled()) {
+ logResponse(status, "ledger", ledgerId, "entry", entryId);
+ }
+
+ int rc = convertStatus(status, BKException.Code.WriteException);
+ writeComplete(rc, ledgerId, entryId, perChannelBookieClient.bookieId,
ctx);
+ }
+
+ private static final Recycler<AddCompletion> ADD_COMPLETION_RECYCLER = new
Recycler<AddCompletion>() {
+ @Override
+ protected AddCompletion newObject(Recycler.Handle<AddCompletion>
handle) {
+ return new AddCompletion(handle);
+ }
+ };
+}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BatchedReadCompletion.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BatchedReadCompletion.java
new file mode 100644
index 0000000000..c28fcc6c2d
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BatchedReadCompletion.java
@@ -0,0 +1,98 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.proto;
+
+import static org.apache.bookkeeper.client.LedgerHandle.INVALID_ENTRY_ID;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.util.ByteBufList;
+
+class BatchedReadCompletion extends CompletionValue {
+
+ final BookkeeperInternalCallbacks.BatchedReadEntryCallback cb;
+
+ public BatchedReadCompletion(final CompletionKey key,
+ final
BookkeeperInternalCallbacks.BatchedReadEntryCallback originalCallback,
+ final Object originalCtx,
+ long ledgerId, final long entryId,
+ PerChannelBookieClient
perChannelBookieClient) {
+ super("BatchedRead", originalCtx, ledgerId, entryId,
perChannelBookieClient);
+ this.opLogger = perChannelBookieClient.readEntryOpLogger;
+ this.timeoutOpLogger = perChannelBookieClient.readTimeoutOpLogger;
+ this.cb = (rc, ledgerId1, startEntryId, bufList, ctx) -> {
+ logOpResult(rc);
+ originalCallback.readEntriesComplete(rc,
+ ledgerId1, entryId,
+ bufList, originalCtx);
+ key.release();
+ };
+ }
+
+ @Override
+ public void errorOut() {
+ errorOut(BKException.Code.BookieHandleNotAvailableException);
+ }
+
+ @Override
+ public void errorOut(final int rc) {
+ errorOutAndRunCallback(
+ () -> cb.readEntriesComplete(rc, ledgerId,
+ entryId, null, ctx));
+ }
+
+ @Override
+ public void handleV2Response(long ledgerId,
+ long entryId,
+ BookkeeperProtocol.StatusCode status,
+ BookieProtocol.Response response) {
+
+ perChannelBookieClient.readEntryOutstanding.dec();
+ if (!(response instanceof BookieProtocol.BatchedReadResponse)) {
+ return;
+ }
+ BookieProtocol.BatchedReadResponse readResponse =
(BookieProtocol.BatchedReadResponse) response;
+ handleBatchedReadResponse(ledgerId, entryId, status,
readResponse.getData(),
+ INVALID_ENTRY_ID, -1L);
+ }
+
+ @Override
+ public void handleV3Response(BookkeeperProtocol.Response response) {
+ // V3 protocol haven't supported batched read yet.
+ }
+
+ private void handleBatchedReadResponse(long ledgerId,
+ long entryId,
+ BookkeeperProtocol.StatusCode
status,
+ ByteBufList buffers,
+ long maxLAC, // max known lac
piggy-back from bookies
+ long lacUpdateTimestamp) { // the
timestamp when the lac is updated.
+ int rc = convertStatus(status, BKException.Code.ReadException);
+
+ if (maxLAC > INVALID_ENTRY_ID && (ctx instanceof
BookkeeperInternalCallbacks.ReadEntryCallbackCtx)) {
+ ((BookkeeperInternalCallbacks.ReadEntryCallbackCtx)
ctx).setLastAddConfirmed(maxLAC);
+ }
+ if (lacUpdateTimestamp > -1L && (ctx instanceof
ReadLastConfirmedAndEntryContext)) {
+ ((ReadLastConfirmedAndEntryContext)
ctx).setLacUpdateTimestamp(lacUpdateTimestamp);
+ }
+ cb.readEntriesComplete(rc, ledgerId, entryId, buffers, ctx);
+ }
+}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/CompletionKey.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/CompletionKey.java
new file mode 100644
index 0000000000..0d543676e5
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/CompletionKey.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.proto;
+
+import org.apache.bookkeeper.proto.BookkeeperProtocol.OperationType;
+
+abstract class CompletionKey {
+ OperationType operationType;
+
+ CompletionKey(OperationType operationType) {
+ this.operationType = operationType;
+ }
+
+ public void release() {}
+}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/CompletionValue.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/CompletionValue.java
new file mode 100644
index 0000000000..f434e0a0c5
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/CompletionValue.java
@@ -0,0 +1,179 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.proto;
+
+import com.google.common.base.Joiner;
+import io.netty.channel.Channel;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.common.util.MathUtils;
+import org.apache.bookkeeper.common.util.MdcUtils;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+abstract class CompletionValue {
+ private final String operationName;
+ protected Object ctx;
+ protected long ledgerId;
+ protected long entryId;
+ protected long startTime;
+ protected OpStatsLogger opLogger;
+ protected OpStatsLogger timeoutOpLogger;
+ protected Map<String, String> mdcContextMap;
+ protected PerChannelBookieClient perChannelBookieClient;
+
+ static final Logger LOG = LoggerFactory.getLogger(CompletionValue.class);
+
+ public CompletionValue(String operationName,
+ Object ctx,
+ long ledgerId, long entryId, PerChannelBookieClient
perChannelBookieClient) {
+ this.operationName = operationName;
+ this.ctx = ctx;
+ this.ledgerId = ledgerId;
+ this.entryId = entryId;
+ this.startTime = MathUtils.nowInNano();
+ this.perChannelBookieClient = perChannelBookieClient;
+ if (perChannelBookieClient != null) {
+ this.mdcContextMap =
perChannelBookieClient.preserveMdcForTaskExecution ? MDC.getCopyOfContextMap()
: null;
+ }
+ }
+
+ private long latency() {
+ return MathUtils.elapsedNanos(startTime);
+ }
+
+ void logOpResult(int rc) {
+ if (rc != BKException.Code.OK) {
+ opLogger.registerFailedEvent(latency(), TimeUnit.NANOSECONDS);
+ } else {
+ opLogger.registerSuccessfulEvent(latency(), TimeUnit.NANOSECONDS);
+ }
+
+ if (rc != BKException.Code.OK
+ &&
!PerChannelBookieClient.EXPECTED_BK_OPERATION_ERRORS.contains(rc)) {
+ perChannelBookieClient.recordError();
+ }
+ }
+
+ boolean maybeTimeout() {
+ if (MathUtils.elapsedNanos(startTime) >=
perChannelBookieClient.readEntryTimeoutNanos) {
+ timeout();
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ void timeout() {
+ errorOut(BKException.Code.TimeoutException);
+ timeoutOpLogger.registerSuccessfulEvent(latency(),
+ TimeUnit.NANOSECONDS);
+ }
+
+ protected void logResponse(BookkeeperProtocol.StatusCode status, Object...
extraInfo) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got {} response from bookie:{} rc:{}, {}",
operationName,
+ perChannelBookieClient.bookieId, status,
Joiner.on(":").join(extraInfo));
+ }
+ }
+
+ protected int convertStatus(BookkeeperProtocol.StatusCode status, int
defaultStatus) {
+ // convert to BKException code
+ int rcToRet = statusCodeToExceptionCode(status);
+ if (rcToRet == BKException.Code.UNINITIALIZED) {
+ LOG.error("{} for failed on bookie {} code {}",
+ operationName, perChannelBookieClient.bookieId, status);
+ return defaultStatus;
+ } else {
+ return rcToRet;
+ }
+ }
+
+ /**
+ * @param status
+ * @return {@link BKException.Code.UNINITIALIZED} if the statuscode is
unknown.
+ */
+ private int statusCodeToExceptionCode(BookkeeperProtocol.StatusCode
status) {
+ switch (status) {
+ case EOK:
+ return BKException.Code.OK;
+ case ENOENTRY:
+ return BKException.Code.NoSuchEntryException;
+ case ENOLEDGER:
+ return BKException.Code.NoSuchLedgerExistsException;
+ case EBADVERSION:
+ return BKException.Code.ProtocolVersionException;
+ case EUA:
+ return BKException.Code.UnauthorizedAccessException;
+ case EFENCED:
+ return BKException.Code.LedgerFencedException;
+ case EREADONLY:
+ return BKException.Code.WriteOnReadOnlyBookieException;
+ case ETOOMANYREQUESTS:
+ return BKException.Code.TooManyRequestsException;
+ case EUNKNOWNLEDGERSTATE:
+ return BKException.Code.DataUnknownException;
+ default:
+ return BKException.Code.UNINITIALIZED;
+ }
+ }
+
+ public void restoreMdcContext() {
+ MdcUtils.restoreContext(mdcContextMap);
+ }
+
+ public abstract void errorOut();
+ public abstract void errorOut(int rc);
+ public void setOutstanding() {
+ // no-op
+ }
+
+ protected void errorOutAndRunCallback(final Runnable callback) {
+ perChannelBookieClient.executor.executeOrdered(ledgerId, () -> {
+ String bAddress = "null";
+ Channel c = perChannelBookieClient.channel;
+ if (c != null && c.remoteAddress() != null) {
+ bAddress = c.remoteAddress().toString();
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Could not write {} request to bookie {} for ledger
{}, entry {}",
+ operationName, bAddress,
+ ledgerId, entryId);
+ }
+ callback.run();
+ });
+ }
+
+ public void handleV2Response(
+ long ledgerId, long entryId, BookkeeperProtocol.StatusCode status,
+ BookieProtocol.Response response) {
+ LOG.warn("Unhandled V2 response {}", response);
+ }
+
+ public abstract void handleV3Response(
+ BookkeeperProtocol.Response response);
+
+ public void release() {}
+}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/EntryCompletionKey.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/EntryCompletionKey.java
new file mode 100644
index 0000000000..09f6413f7f
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/EntryCompletionKey.java
@@ -0,0 +1,82 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.proto;
+
+import io.netty.util.Recycler;
+
+class EntryCompletionKey extends CompletionKey {
+ private final Recycler.Handle<EntryCompletionKey> recyclerHandle;
+ long ledgerId;
+ long entryId;
+
+ static EntryCompletionKey acquireV2Key(long ledgerId, long entryId,
+ BookkeeperProtocol.OperationType
operationType) {
+ EntryCompletionKey key = V2_KEY_RECYCLER.get();
+ key.reset(ledgerId, entryId, operationType);
+ return key;
+ }
+
+ private EntryCompletionKey(Recycler.Handle<EntryCompletionKey> handle) {
+ super(null);
+ this.recyclerHandle = handle;
+ }
+
+ void reset(long ledgerId, long entryId, BookkeeperProtocol.OperationType
operationType) {
+ this.ledgerId = ledgerId;
+ this.entryId = entryId;
+ this.operationType = operationType;
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ if (!(object instanceof EntryCompletionKey)) {
+ return false;
+ }
+ EntryCompletionKey that = (EntryCompletionKey) object;
+ return this.entryId == that.entryId
+ && this.ledgerId == that.ledgerId
+ && this.operationType == that.operationType;
+ }
+
+ @Override
+ public int hashCode() {
+ return Long.hashCode(ledgerId) * 31 + Long.hashCode(entryId);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%d:%d %s", ledgerId, entryId, operationType);
+ }
+
+ public void release() {
+ recyclerHandle.recycle(this);
+ }
+
+ private static final Recycler<EntryCompletionKey> V2_KEY_RECYCLER =
+ new Recycler<EntryCompletionKey>() {
+ @Override
+ protected EntryCompletionKey newObject(
+ Recycler.Handle<EntryCompletionKey> handle) {
+ return new EntryCompletionKey(handle);
+ }
+ };
+}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ForceLedgerCompletion.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ForceLedgerCompletion.java
new file mode 100644
index 0000000000..e0717056f4
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ForceLedgerCompletion.java
@@ -0,0 +1,70 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.proto;
+
+import org.apache.bookkeeper.client.BKException;
+
+class ForceLedgerCompletion extends CompletionValue {
+ final BookkeeperInternalCallbacks.ForceLedgerCallback cb;
+
+ public ForceLedgerCompletion(final CompletionKey key,
+ final
BookkeeperInternalCallbacks.ForceLedgerCallback originalCallback,
+ final Object originalCtx,
+ final long ledgerId,
+ PerChannelBookieClient
perChannelBookieClient) {
+ super("ForceLedger",
+ originalCtx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED,
perChannelBookieClient);
+ this.opLogger = perChannelBookieClient.forceLedgerOpLogger;
+ this.timeoutOpLogger =
perChannelBookieClient.forceLedgerTimeoutOpLogger;
+ this.cb = (rc, ledgerId1, addr, ctx) -> {
+ logOpResult(rc);
+ originalCallback.forceLedgerComplete(rc, ledgerId1,
+ addr, originalCtx);
+ key.release();
+ };
+ }
+
+ @Override
+ public void errorOut() {
+ errorOut(BKException.Code.BookieHandleNotAvailableException);
+ }
+
+ @Override
+ public void errorOut(final int rc) {
+ errorOutAndRunCallback(
+ () -> cb.forceLedgerComplete(rc, ledgerId,
perChannelBookieClient.bookieId, ctx));
+ }
+
+ @Override
+ public void handleV3Response(BookkeeperProtocol.Response response) {
+ BookkeeperProtocol.ForceLedgerResponse forceLedgerResponse =
response.getForceLedgerResponse();
+ BookkeeperProtocol.StatusCode status = response.getStatus() ==
BookkeeperProtocol.StatusCode.EOK
+ ? forceLedgerResponse.getStatus() : response.getStatus();
+ long ledgerId = forceLedgerResponse.getLedgerId();
+
+ if (LOG.isDebugEnabled()) {
+ logResponse(status, "ledger", ledgerId);
+ }
+ int rc = convertStatus(status, BKException.Code.WriteException);
+ cb.forceLedgerComplete(rc, ledgerId, perChannelBookieClient.bookieId,
ctx);
+ }
+}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoCompletion.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoCompletion.java
new file mode 100644
index 0000000000..e502749d79
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoCompletion.java
@@ -0,0 +1,84 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.proto;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookieInfoReader;
+import org.apache.bookkeeper.common.util.MathUtils;
+
+class GetBookieInfoCompletion extends CompletionValue {
+ final BookkeeperInternalCallbacks.GetBookieInfoCallback cb;
+
+ public GetBookieInfoCompletion(final CompletionKey key,
+ final
BookkeeperInternalCallbacks.GetBookieInfoCallback origCallback,
+ final Object origCtx,
+ PerChannelBookieClient
perChannelBookieClient) {
+ super("GetBookieInfo", origCtx, 0L, 0L, perChannelBookieClient);
+ this.opLogger = perChannelBookieClient.getBookieInfoOpLogger;
+ this.timeoutOpLogger =
perChannelBookieClient.getBookieInfoTimeoutOpLogger;
+ this.cb = (rc, bInfo, ctx) -> {
+ logOpResult(rc);
+ origCallback.getBookieInfoComplete(rc, bInfo, origCtx);
+ key.release();
+ };
+ }
+
+ @Override
+ boolean maybeTimeout() {
+ if (MathUtils.elapsedNanos(startTime) >=
perChannelBookieClient.getBookieInfoTimeoutNanos) {
+ timeout();
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public void errorOut() {
+ errorOut(BKException.Code.BookieHandleNotAvailableException);
+ }
+
+ @Override
+ public void errorOut(final int rc) {
+ errorOutAndRunCallback(
+ () -> cb.getBookieInfoComplete(rc, new
BookieInfoReader.BookieInfo(), ctx));
+ }
+
+ @Override
+ public void handleV3Response(BookkeeperProtocol.Response response) {
+ BookkeeperProtocol.GetBookieInfoResponse getBookieInfoResponse =
response.getGetBookieInfoResponse();
+ BookkeeperProtocol.StatusCode status = response.getStatus() ==
BookkeeperProtocol.StatusCode.EOK
+ ? getBookieInfoResponse.getStatus() : response.getStatus();
+
+ long freeDiskSpace = getBookieInfoResponse.getFreeDiskSpace();
+ long totalDiskSpace = getBookieInfoResponse.getTotalDiskCapacity();
+
+ if (LOG.isDebugEnabled()) {
+ logResponse(status, "freeDisk", freeDiskSpace, "totalDisk",
totalDiskSpace);
+ }
+
+ int rc = convertStatus(status, BKException.Code.ReadException);
+ cb.getBookieInfoComplete(rc,
+ new BookieInfoReader.BookieInfo(totalDiskSpace,
+ freeDiskSpace), ctx);
+ }
+}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetListOfEntriesOfLedgerCompletion.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetListOfEntriesOfLedgerCompletion.java
new file mode 100644
index 0000000000..fea19b83d3
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetListOfEntriesOfLedgerCompletion.java
@@ -0,0 +1,84 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.proto;
+
+import static
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetListOfEntriesOfLedgerCallback;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.util.AvailabilityOfEntriesOfLedger;
+
+class GetListOfEntriesOfLedgerCompletion extends CompletionValue {
+ final BookkeeperInternalCallbacks.GetListOfEntriesOfLedgerCallback cb;
+
+ public GetListOfEntriesOfLedgerCompletion(final CompletionKey key,
+ final
GetListOfEntriesOfLedgerCallback origCallback,
+ final long ledgerId,
+ PerChannelBookieClient
perChannelBookieClient) {
+ super("GetListOfEntriesOfLedger", null, ledgerId, 0L,
perChannelBookieClient);
+ this.opLogger =
perChannelBookieClient.getListOfEntriesOfLedgerCompletionOpLogger;
+ this.timeoutOpLogger =
perChannelBookieClient.getListOfEntriesOfLedgerCompletionTimeoutOpLogger;
+ this.cb = (rc, ledgerId1, availabilityOfEntriesOfLedger) -> {
+ logOpResult(rc);
+ origCallback.getListOfEntriesOfLedgerComplete(rc, ledgerId1,
availabilityOfEntriesOfLedger);
+ key.release();
+ };
+ }
+
+ @Override
+ public void errorOut() {
+ errorOut(BKException.Code.BookieHandleNotAvailableException);
+ }
+
+ @Override
+ public void errorOut(final int rc) {
+ errorOutAndRunCallback(() -> cb.getListOfEntriesOfLedgerComplete(rc,
ledgerId, null));
+ }
+
+ @Override
+ public void handleV3Response(BookkeeperProtocol.Response response) {
+ BookkeeperProtocol.GetListOfEntriesOfLedgerResponse
getListOfEntriesOfLedgerResponse = response
+ .getGetListOfEntriesOfLedgerResponse();
+ ByteBuf availabilityOfEntriesOfLedgerBuffer = Unpooled.EMPTY_BUFFER;
+ BookkeeperProtocol.StatusCode status =
+ response.getStatus() == BookkeeperProtocol.StatusCode.EOK ?
getListOfEntriesOfLedgerResponse.getStatus()
+ : response.getStatus();
+
+ if
(getListOfEntriesOfLedgerResponse.hasAvailabilityOfEntriesOfLedger()) {
+ availabilityOfEntriesOfLedgerBuffer = Unpooled.wrappedBuffer(
+
getListOfEntriesOfLedgerResponse.getAvailabilityOfEntriesOfLedger().asReadOnlyByteBuffer());
+ }
+
+ if (LOG.isDebugEnabled()) {
+ logResponse(status, "ledgerId", ledgerId);
+ }
+
+ int rc = convertStatus(status, BKException.Code.ReadException);
+ AvailabilityOfEntriesOfLedger availabilityOfEntriesOfLedger = null;
+ if (rc == BKException.Code.OK) {
+ availabilityOfEntriesOfLedger = new AvailabilityOfEntriesOfLedger(
+ availabilityOfEntriesOfLedgerBuffer.slice());
+ }
+ cb.getListOfEntriesOfLedgerComplete(rc, ledgerId,
availabilityOfEntriesOfLedger);
+ }
+}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 100d3667ef..d5faaa2d71 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -18,9 +18,6 @@
*/
package org.apache.bookkeeper.proto;
-import static org.apache.bookkeeper.client.LedgerHandle.INVALID_ENTRY_ID;
-
-import com.google.common.base.Joiner;
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import com.google.protobuf.ExtensionRegistry;
@@ -28,7 +25,6 @@ import com.google.protobuf.UnsafeByteOperations;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
@@ -61,7 +57,6 @@ import io.netty.incubator.channel.uring.IOUringEventLoopGroup;
import io.netty.incubator.channel.uring.IOUringSocketChannel;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
-import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
@@ -95,7 +90,6 @@ import org.apache.bookkeeper.auth.BookKeeperPrincipal;
import org.apache.bookkeeper.auth.ClientAuthProvider;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeperClientStats;
-import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.common.util.MdcUtils;
import org.apache.bookkeeper.common.util.OrderedExecutor;
@@ -108,31 +102,22 @@ import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetListOfEntriesOfLedgerCallback;
import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
-import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallbackCtx;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
-import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.StartTLSCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse;
import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader;
import org.apache.bookkeeper.proto.BookkeeperProtocol.ForceLedgerRequest;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.ForceLedgerResponse;
import org.apache.bookkeeper.proto.BookkeeperProtocol.GetBookieInfoRequest;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.GetBookieInfoResponse;
import
org.apache.bookkeeper.proto.BookkeeperProtocol.GetListOfEntriesOfLedgerRequest;
-import
org.apache.bookkeeper.proto.BookkeeperProtocol.GetListOfEntriesOfLedgerResponse;
import org.apache.bookkeeper.proto.BookkeeperProtocol.OperationType;
import org.apache.bookkeeper.proto.BookkeeperProtocol.ProtocolVersion;
import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacResponse;
import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadRequest;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse;
import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
import org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacResponse;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.OpStatsLogger;
@@ -141,7 +126,6 @@ import org.apache.bookkeeper.stats.annotations.StatsDoc;
import org.apache.bookkeeper.tls.SecurityException;
import org.apache.bookkeeper.tls.SecurityHandlerFactory;
import org.apache.bookkeeper.tls.SecurityHandlerFactory.NodeType;
-import org.apache.bookkeeper.util.AvailabilityOfEntriesOfLedger;
import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.bookkeeper.util.StringUtils;
@@ -165,7 +149,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
static final Logger LOG =
LoggerFactory.getLogger(PerChannelBookieClient.class);
// this set contains the bookie error return codes that we do not consider
for a bookie to be "faulty"
- private static final Set<Integer> expectedBkOperationErrors =
Collections.unmodifiableSet(Sets
+ protected static final Set<Integer> EXPECTED_BK_OPERATION_ERRORS =
Collections.unmodifiableSet(Sets
.newHashSet(BKException.Code.BookieHandleNotAvailableException,
BKException.Code.NoSuchEntryException,
BKException.Code.NoSuchLedgerExistsException,
@@ -201,79 +185,79 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
name = BookKeeperClientStats.CHANNEL_READ_OP,
help = "channel stats of read entries requests"
)
- private final OpStatsLogger readEntryOpLogger;
+ protected final OpStatsLogger readEntryOpLogger;
@StatsDoc(
name = BookKeeperClientStats.CHANNEL_TIMEOUT_READ,
help = "timeout stats of read entries requests"
)
- private final OpStatsLogger readTimeoutOpLogger;
+ protected final OpStatsLogger readTimeoutOpLogger;
@StatsDoc(
name = BookKeeperClientStats.CHANNEL_ADD_OP,
help = "channel stats of add entries requests"
)
- private final OpStatsLogger addEntryOpLogger;
+ protected final OpStatsLogger addEntryOpLogger;
@StatsDoc(
name = BookKeeperClientStats.CHANNEL_WRITE_LAC_OP,
help = "channel stats of write_lac requests"
)
- private final OpStatsLogger writeLacOpLogger;
+ protected final OpStatsLogger writeLacOpLogger;
@StatsDoc(
name = BookKeeperClientStats.CHANNEL_FORCE_OP,
help = "channel stats of force requests"
)
- private final OpStatsLogger forceLedgerOpLogger;
+ protected final OpStatsLogger forceLedgerOpLogger;
@StatsDoc(
name = BookKeeperClientStats.CHANNEL_READ_LAC_OP,
help = "channel stats of read_lac requests"
)
- private final OpStatsLogger readLacOpLogger;
+ protected final OpStatsLogger readLacOpLogger;
@StatsDoc(
name = BookKeeperClientStats.CHANNEL_TIMEOUT_ADD,
help = "timeout stats of add entries requests"
)
- private final OpStatsLogger addTimeoutOpLogger;
+ protected final OpStatsLogger addTimeoutOpLogger;
@StatsDoc(
name = BookKeeperClientStats.CHANNEL_TIMEOUT_WRITE_LAC,
help = "timeout stats of write_lac requests"
)
- private final OpStatsLogger writeLacTimeoutOpLogger;
+ protected final OpStatsLogger writeLacTimeoutOpLogger;
@StatsDoc(
name = BookKeeperClientStats.CHANNEL_TIMEOUT_FORCE,
help = "timeout stats of force requests"
)
- private final OpStatsLogger forceLedgerTimeoutOpLogger;
+ protected final OpStatsLogger forceLedgerTimeoutOpLogger;
@StatsDoc(
name = BookKeeperClientStats.CHANNEL_TIMEOUT_READ_LAC,
help = "timeout stats of read_lac requests"
)
- private final OpStatsLogger readLacTimeoutOpLogger;
+ protected final OpStatsLogger readLacTimeoutOpLogger;
@StatsDoc(
name = BookKeeperClientStats.GET_BOOKIE_INFO_OP,
help = "channel stats of get_bookie_info requests"
)
- private final OpStatsLogger getBookieInfoOpLogger;
+ protected final OpStatsLogger getBookieInfoOpLogger;
@StatsDoc(
name = BookKeeperClientStats.TIMEOUT_GET_BOOKIE_INFO,
help = "timeout stats of get_bookie_info requests"
)
- private final OpStatsLogger getBookieInfoTimeoutOpLogger;
+ protected final OpStatsLogger getBookieInfoTimeoutOpLogger;
@StatsDoc(
name = BookKeeperClientStats.CHANNEL_START_TLS_OP,
help = "channel stats of start_tls requests"
)
- private final OpStatsLogger startTLSOpLogger;
+ protected final OpStatsLogger startTLSOpLogger;
@StatsDoc(
name = BookKeeperClientStats.CHANNEL_TIMEOUT_START_TLS_OP,
help = "timeout stats of start_tls requests"
)
- private final OpStatsLogger startTLSTimeoutOpLogger;
+ protected final OpStatsLogger startTLSTimeoutOpLogger;
@StatsDoc(
name = BookKeeperClientStats.CLIENT_CONNECT_TIMER,
help = "channel stats of connect requests"
)
private final OpStatsLogger connectTimer;
- private final OpStatsLogger getListOfEntriesOfLedgerCompletionOpLogger;
- private final OpStatsLogger
getListOfEntriesOfLedgerCompletionTimeoutOpLogger;
+ protected final OpStatsLogger getListOfEntriesOfLedgerCompletionOpLogger;
+ protected final OpStatsLogger
getListOfEntriesOfLedgerCompletionTimeoutOpLogger;
@StatsDoc(
name = BookKeeperClientStats.NETTY_EXCEPTION_CNT,
help = "the number of exceptions received from this channel"
@@ -283,12 +267,12 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
name = BookKeeperClientStats.ADD_OP_OUTSTANDING,
help = "the number of outstanding add_entry requests"
)
- private final Counter addEntryOutstanding;
+ protected final Counter addEntryOutstanding;
@StatsDoc(
name = BookKeeperClientStats.READ_OP_OUTSTANDING,
help = "the number of outstanding add_entry requests"
)
- private final Counter readEntryOutstanding;
+ protected final Counter readEntryOutstanding;
/* collect stats on all Ops that flows through netty pipeline */
@StatsDoc(
name = BookKeeperClientStats.NETTY_OPS,
@@ -317,7 +301,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
private final Counter failedTlsHandshakeCounter;
private final boolean useV2WireProtocol;
- private final boolean preserveMdcForTaskExecution;
+ protected final boolean preserveMdcForTaskExecution;
/**
* The following member variables do not need to be concurrent, or volatile
@@ -343,7 +327,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
volatile ConnectionState state;
final ReentrantReadWriteLock closeLock = new ReentrantReadWriteLock();
- private final ClientConfiguration conf;
+ protected final ClientConfiguration conf;
private final PerChannelBookieClientPool pcbcPool;
private final ClientAuthProvider.Factory authProviderFactory;
@@ -694,7 +678,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
// writeLac is mostly like addEntry hence uses addEntryTimeout
completionObjects.put(completionKey,
new WriteLacCompletion(completionKey, cb,
- ctx, ledgerId));
+ ctx, ledgerId, this));
// Build the request
BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
@@ -733,7 +717,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
// force is mostly like addEntry hence uses addEntryTimeout
completionObjects.put(completionKey,
new ForceLedgerCompletion(completionKey, cb,
- ctx, ledgerId));
+ ctx, ledgerId, this));
// Build the request
BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
@@ -783,7 +767,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
cb.writeComplete(BKException.Code.IllegalOpException,
ledgerId, entryId, bookieId, ctx);
return;
}
- completionKey = acquireV2Key(ledgerId, entryId,
OperationType.ADD_ENTRY);
+ completionKey = EntryCompletionKey.acquireV2Key(ledgerId, entryId,
OperationType.ADD_ENTRY);
if (toSend instanceof ByteBuf) {
ByteBuf byteBuf = ((ByteBuf) toSend).retainedDuplicate();
@@ -835,8 +819,8 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
}
putCompletionKeyValue(completionKey,
- acquireAddCompletion(completionKey,
- cb, ctx, ledgerId,
entryId));
+ AddCompletion.acquireAddCompletion(completionKey,
+ cb, ctx, ledgerId, entryId,
this));
// addEntry times out on backpressure
writeAndFlush(channel, completionKey, request, allowFastFail,
cleanupActionFailedBeforeWrite,
cleanupActionAfterWrite);
@@ -848,7 +832,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
if (useV2WireProtocol) {
request =
BookieProtocol.ReadRequest.create(BookieProtocol.CURRENT_PROTOCOL_VERSION,
ledgerId, 0, (short) 0,
null);
- completionKey = acquireV2Key(ledgerId, 0, OperationType.READ_LAC);
+ completionKey = EntryCompletionKey.acquireV2Key(ledgerId, 0,
OperationType.READ_LAC);
} else {
final long txnId = getTxnId();
completionKey = new TxnCompletionKey(txnId,
OperationType.READ_LAC);
@@ -867,14 +851,15 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
}
putCompletionKeyValue(completionKey,
new ReadLacCompletion(completionKey, cb,
- ctx, ledgerId));
+ ctx, ledgerId, this));
writeAndFlush(channel, completionKey, request);
}
public void getListOfEntriesOfLedger(final long ledgerId,
GetListOfEntriesOfLedgerCallback cb) {
final long txnId = getTxnId();
final CompletionKey completionKey = new TxnCompletionKey(txnId,
OperationType.GET_LIST_OF_ENTRIES_OF_LEDGER);
- completionObjects.put(completionKey, new
GetListOfEntriesOfLedgerCompletion(completionKey, cb, ledgerId));
+ completionObjects.put(completionKey, new
GetListOfEntriesOfLedgerCompletion(
+ completionKey, cb, ledgerId, this));
// Build the request.
BKPacketHeader.Builder headerBuilder =
BKPacketHeader.newBuilder().setVersion(ProtocolVersion.VERSION_THREE)
@@ -932,7 +917,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
if (useV2WireProtocol) {
request =
BookieProtocol.ReadRequest.create(BookieProtocol.CURRENT_PROTOCOL_VERSION,
ledgerId, entryId, (short) flags, masterKey);
- completionKey = acquireV2Key(ledgerId, entryId,
OperationType.READ_ENTRY);
+ completionKey = EntryCompletionKey.acquireV2Key(ledgerId, entryId,
OperationType.READ_ENTRY);
} else {
final long txnId = getTxnId();
completionKey = new TxnCompletionKey(txnId,
OperationType.READ_ENTRY);
@@ -991,7 +976,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
.build();
}
- ReadCompletion readCompletion = new ReadCompletion(completionKey, cb,
ctx, ledgerId, entryId);
+ ReadCompletion readCompletion = new ReadCompletion(completionKey, cb,
ctx, ledgerId, entryId, this);
putCompletionKeyValue(completionKey, readCompletion);
writeAndFlush(channel, completionKey, request, allowFastFail, null,
null);
@@ -1034,7 +1019,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
throw new UnsupportedOperationException("Unsupported batch read
entry operation for v3 protocol.");
}
BatchedReadCompletion readCompletion = new BatchedReadCompletion(
- completionKey, cb, ctx, ledgerId, startEntryId);
+ completionKey, cb, ctx, ledgerId, startEntryId, this);
putCompletionKeyValue(completionKey, readCompletion);
writeAndFlush(channel, completionKey, request, allowFastFail, null,
null);
@@ -1045,7 +1030,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
final CompletionKey completionKey = new TxnCompletionKey(txnId,
OperationType.GET_BOOKIE_INFO);
completionObjects.put(completionKey,
new GetBookieInfoCompletion(
- completionKey, cb, ctx));
+ completionKey, cb, ctx, this));
// Build the request and calculate the total size to be included in
the packet.
BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
@@ -1409,7 +1394,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
if (OperationType.BATCH_READ_ENTRY == operationType) {
key = new TxnCompletionKey(((BookieProtocol.BatchedReadResponse)
response).getRequestId(), operationType);
} else {
- key = acquireV2Key(response.ledgerId, response.entryId,
operationType);
+ key = EntryCompletionKey.acquireV2Key(response.ledgerId,
response.entryId, operationType);
}
CompletionValue completionValue = getCompletionValue(key);
key.release();
@@ -1652,813 +1637,11 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
});
}
- /**
- * Boiler-plate wrapper classes follow.
- *
- */
-
- // visible for testing
- abstract class CompletionValue {
- private final OpStatsLogger opLogger;
- private final OpStatsLogger timeoutOpLogger;
- private final String operationName;
- private final Map<String, String> mdcContextMap;
- protected Object ctx;
- protected long ledgerId;
- protected long entryId;
- protected long startTime;
-
- public CompletionValue(String operationName,
- Object ctx,
- long ledgerId, long entryId,
- OpStatsLogger opLogger,
- OpStatsLogger timeoutOpLogger) {
- this.operationName = operationName;
- this.ctx = ctx;
- this.ledgerId = ledgerId;
- this.entryId = entryId;
- this.startTime = MathUtils.nowInNano();
- this.opLogger = opLogger;
- this.timeoutOpLogger = timeoutOpLogger;
- this.mdcContextMap = preserveMdcForTaskExecution ?
MDC.getCopyOfContextMap() : null;
- }
-
- private long latency() {
- return MathUtils.elapsedNanos(startTime);
- }
-
- void logOpResult(int rc) {
- if (rc != BKException.Code.OK) {
- opLogger.registerFailedEvent(latency(), TimeUnit.NANOSECONDS);
- } else {
- opLogger.registerSuccessfulEvent(latency(),
TimeUnit.NANOSECONDS);
- }
-
- if (rc != BKException.Code.OK
- && !expectedBkOperationErrors.contains(rc)) {
- recordError();
- }
- }
-
- boolean maybeTimeout() {
- if (MathUtils.elapsedNanos(startTime) >= readEntryTimeoutNanos) {
- timeout();
- return true;
- } else {
- return false;
- }
- }
-
- void timeout() {
- errorOut(BKException.Code.TimeoutException);
- timeoutOpLogger.registerSuccessfulEvent(latency(),
- TimeUnit.NANOSECONDS);
- }
-
- protected void logResponse(StatusCode status, Object... extraInfo) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Got {} response from bookie:{} rc:{}, {}",
operationName, bookieId, status,
- Joiner.on(":").join(extraInfo));
- }
- }
-
- protected int convertStatus(StatusCode status, int defaultStatus) {
- // convert to BKException code
- int rcToRet = statusCodeToExceptionCode(status);
- if (rcToRet == BKException.Code.UNINITIALIZED) {
- LOG.error("{} for failed on bookie {} code {}",
- operationName, bookieId, status);
- return defaultStatus;
- } else {
- return rcToRet;
- }
- }
-
- public void restoreMdcContext() {
- MdcUtils.restoreContext(mdcContextMap);
- }
-
- public abstract void errorOut();
- public abstract void errorOut(int rc);
- public void setOutstanding() {
- // no-op
- }
-
- protected void errorOutAndRunCallback(final Runnable callback) {
- executor.executeOrdered(ledgerId, () -> {
- String bAddress = "null";
- Channel c = channel;
- if (c != null && c.remoteAddress() != null) {
- bAddress = c.remoteAddress().toString();
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Could not write {} request to bookie {} for
ledger {}, entry {}",
- operationName, bAddress,
- ledgerId, entryId);
- }
- callback.run();
- });
- }
-
- public void handleV2Response(
- long ledgerId, long entryId, StatusCode status,
- BookieProtocol.Response response) {
- LOG.warn("Unhandled V2 response {}", response);
- }
-
- public abstract void handleV3Response(
- BookkeeperProtocol.Response response);
- }
-
- // visible for testing
- class WriteLacCompletion extends CompletionValue {
- final WriteLacCallback cb;
-
- public WriteLacCompletion(final CompletionKey key,
- final WriteLacCallback originalCallback,
- final Object originalCtx,
- final long ledgerId) {
- super("WriteLAC",
- originalCtx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED,
- writeLacOpLogger, writeLacTimeoutOpLogger);
- this.cb = new WriteLacCallback() {
- @Override
- public void writeLacComplete(int rc, long ledgerId,
- BookieId addr,
- Object ctx) {
- logOpResult(rc);
- originalCallback.writeLacComplete(rc, ledgerId,
- addr, originalCtx);
- key.release();
- }
- };
- }
-
- @Override
- public void errorOut() {
- errorOut(BKException.Code.BookieHandleNotAvailableException);
- }
-
- @Override
- public void errorOut(final int rc) {
- errorOutAndRunCallback(
- () -> cb.writeLacComplete(rc, ledgerId, bookieId, ctx));
- }
-
- @Override
- public void handleV3Response(BookkeeperProtocol.Response response) {
- WriteLacResponse writeLacResponse = response.getWriteLacResponse();
- StatusCode status = response.getStatus() == StatusCode.EOK
- ? writeLacResponse.getStatus() : response.getStatus();
- long ledgerId = writeLacResponse.getLedgerId();
-
- if (LOG.isDebugEnabled()) {
- logResponse(status, "ledger", ledgerId);
- }
- int rc = convertStatus(status, BKException.Code.WriteException);
- cb.writeLacComplete(rc, ledgerId, bookieId, ctx);
- }
- }
-
- class ForceLedgerCompletion extends CompletionValue {
- final ForceLedgerCallback cb;
-
- public ForceLedgerCompletion(final CompletionKey key,
- final ForceLedgerCallback originalCallback,
- final Object originalCtx,
- final long ledgerId) {
- super("ForceLedger",
- originalCtx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED,
- forceLedgerOpLogger, forceLedgerTimeoutOpLogger);
- this.cb = new ForceLedgerCallback() {
- @Override
- public void forceLedgerComplete(int rc, long ledgerId,
- BookieId addr,
- Object ctx) {
- logOpResult(rc);
- originalCallback.forceLedgerComplete(rc, ledgerId,
- addr, originalCtx);
- key.release();
- }
- };
- }
-
- @Override
- public void errorOut() {
- errorOut(BKException.Code.BookieHandleNotAvailableException);
- }
-
- @Override
- public void errorOut(final int rc) {
- errorOutAndRunCallback(
- () -> cb.forceLedgerComplete(rc, ledgerId, bookieId, ctx));
- }
-
- @Override
- public void handleV3Response(BookkeeperProtocol.Response response) {
- ForceLedgerResponse forceLedgerResponse =
response.getForceLedgerResponse();
- StatusCode status = response.getStatus() == StatusCode.EOK
- ? forceLedgerResponse.getStatus() : response.getStatus();
- long ledgerId = forceLedgerResponse.getLedgerId();
-
- if (LOG.isDebugEnabled()) {
- logResponse(status, "ledger", ledgerId);
- }
- int rc = convertStatus(status, BKException.Code.WriteException);
- cb.forceLedgerComplete(rc, ledgerId, bookieId, ctx);
- }
- }
-
- // visible for testing
- class ReadLacCompletion extends CompletionValue {
- final ReadLacCallback cb;
-
- public ReadLacCompletion(final CompletionKey key,
- ReadLacCallback originalCallback,
- final Object ctx, final long ledgerId) {
- super("ReadLAC", ctx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED,
- readLacOpLogger, readLacTimeoutOpLogger);
- this.cb = new ReadLacCallback() {
- @Override
- public void readLacComplete(int rc, long ledgerId,
- ByteBuf lacBuffer,
- ByteBuf lastEntryBuffer,
- Object ctx) {
- logOpResult(rc);
- originalCallback.readLacComplete(
- rc, ledgerId, lacBuffer, lastEntryBuffer, ctx);
- key.release();
- }
- };
- }
-
- @Override
- public void errorOut() {
- errorOut(BKException.Code.BookieHandleNotAvailableException);
- }
-
- @Override
- public void errorOut(final int rc) {
- errorOutAndRunCallback(
- () -> cb.readLacComplete(rc, ledgerId, null, null, ctx));
- }
-
- @Override
- public void handleV3Response(BookkeeperProtocol.Response response) {
- ReadLacResponse readLacResponse = response.getReadLacResponse();
- ByteBuf lacBuffer = Unpooled.EMPTY_BUFFER;
- ByteBuf lastEntryBuffer = Unpooled.EMPTY_BUFFER;
- StatusCode status = response.getStatus() == StatusCode.EOK
- ? readLacResponse.getStatus() : response.getStatus();
-
- if (readLacResponse.hasLacBody()) {
- lacBuffer =
Unpooled.wrappedBuffer(readLacResponse.getLacBody().asReadOnlyByteBuffer());
- }
-
- if (readLacResponse.hasLastEntryBody()) {
- lastEntryBuffer =
Unpooled.wrappedBuffer(readLacResponse.getLastEntryBody().asReadOnlyByteBuffer());
- }
-
- if (LOG.isDebugEnabled()) {
- logResponse(status, "ledgerId", ledgerId);
- }
-
- int rc = convertStatus(status, BKException.Code.ReadException);
- cb.readLacComplete(rc, ledgerId, lacBuffer.slice(),
- lastEntryBuffer.slice(), ctx);
- }
- }
-
- // visible for testing
- class ReadCompletion extends CompletionValue {
- final ReadEntryCallback cb;
-
- public ReadCompletion(final CompletionKey key,
- final ReadEntryCallback originalCallback,
- final Object originalCtx,
- long ledgerId, final long entryId) {
- super("Read", originalCtx, ledgerId, entryId,
- readEntryOpLogger, readTimeoutOpLogger);
-
- this.cb = new ReadEntryCallback() {
- @Override
- public void readEntryComplete(int rc, long ledgerId,
- long entryId, ByteBuf buffer,
- Object ctx) {
- logOpResult(rc);
- originalCallback.readEntryComplete(rc,
- ledgerId, entryId,
- buffer,
originalCtx);
- key.release();
- }
- };
- }
-
- @Override
- public void errorOut() {
- errorOut(BKException.Code.BookieHandleNotAvailableException);
- }
-
- @Override
- public void errorOut(final int rc) {
- errorOutAndRunCallback(
- () -> cb.readEntryComplete(rc, ledgerId,
- entryId, null, ctx));
- }
-
- @Override
- public void setOutstanding() {
- readEntryOutstanding.inc();
- }
-
- @Override
- public void handleV2Response(long ledgerId, long entryId,
- StatusCode status,
- BookieProtocol.Response response) {
- readEntryOutstanding.dec();
- if (!(response instanceof BookieProtocol.ReadResponse)) {
- return;
- }
- BookieProtocol.ReadResponse readResponse =
(BookieProtocol.ReadResponse) response;
- handleReadResponse(ledgerId, entryId, status,
readResponse.getData(),
- INVALID_ENTRY_ID, -1L);
- }
-
- @Override
- public void handleV3Response(BookkeeperProtocol.Response response) {
- readEntryOutstanding.dec();
- ReadResponse readResponse = response.getReadResponse();
- StatusCode status = response.getStatus() == StatusCode.EOK
- ? readResponse.getStatus() : response.getStatus();
- ByteBuf buffer = Unpooled.EMPTY_BUFFER;
- if (readResponse.hasBody()) {
- buffer =
Unpooled.wrappedBuffer(readResponse.getBody().asReadOnlyByteBuffer());
- }
- long maxLAC = INVALID_ENTRY_ID;
- if (readResponse.hasMaxLAC()) {
- maxLAC = readResponse.getMaxLAC();
- }
- long lacUpdateTimestamp = -1L;
- if (readResponse.hasLacUpdateTimestamp()) {
- lacUpdateTimestamp = readResponse.getLacUpdateTimestamp();
- }
- handleReadResponse(readResponse.getLedgerId(),
- readResponse.getEntryId(),
- status, buffer, maxLAC, lacUpdateTimestamp);
- ReferenceCountUtil.release(
- buffer); // meaningless using unpooled, but client may
expect to hold the last reference
- }
-
- private void handleReadResponse(long ledgerId,
- long entryId,
- StatusCode status,
- ByteBuf buffer,
- long maxLAC, // max known lac
piggy-back from bookies
- long lacUpdateTimestamp) { // the
timestamp when the lac is updated.
- int readableBytes = buffer.readableBytes();
- if (LOG.isDebugEnabled()) {
- logResponse(status, "ledger", ledgerId, "entry", entryId,
"entryLength", readableBytes);
- }
-
- int rc = convertStatus(status, BKException.Code.ReadException);
-
- if (maxLAC > INVALID_ENTRY_ID && (ctx instanceof
ReadEntryCallbackCtx)) {
- ((ReadEntryCallbackCtx) ctx).setLastAddConfirmed(maxLAC);
- }
- if (lacUpdateTimestamp > -1L && (ctx instanceof
ReadLastConfirmedAndEntryContext)) {
- ((ReadLastConfirmedAndEntryContext)
ctx).setLacUpdateTimestamp(lacUpdateTimestamp);
- }
- cb.readEntryComplete(rc, ledgerId, entryId, buffer.slice(), ctx);
- }
- }
-
- class BatchedReadCompletion extends CompletionValue {
-
- final BatchedReadEntryCallback cb;
-
- public BatchedReadCompletion(final CompletionKey key,
- final BatchedReadEntryCallback
originalCallback,
- final Object originalCtx,
- long ledgerId, final long entryId) {
- super("BatchedRead", originalCtx, ledgerId, entryId,
- readEntryOpLogger, readTimeoutOpLogger);
- this.cb = new BatchedReadEntryCallback() {
-
- @Override
- public void readEntriesComplete(int rc,
- long ledgerId,
- long startEntryId,
- ByteBufList bufList,
- Object ctx) {
- logOpResult(rc);
- originalCallback.readEntriesComplete(rc,
- ledgerId, entryId,
- bufList, originalCtx);
- key.release();
- }
- };
- }
-
- @Override
- public void errorOut() {
- errorOut(BKException.Code.BookieHandleNotAvailableException);
- }
-
- @Override
- public void errorOut(final int rc) {
- errorOutAndRunCallback(
- () -> cb.readEntriesComplete(rc, ledgerId,
- entryId, null, ctx));
- }
-
- @Override
- public void handleV2Response(long ledgerId,
- long entryId,
- StatusCode status,
- BookieProtocol.Response response) {
-
- readEntryOutstanding.dec();
- if (!(response instanceof BookieProtocol.BatchedReadResponse)) {
- return;
- }
- BookieProtocol.BatchedReadResponse readResponse =
(BookieProtocol.BatchedReadResponse) response;
- handleBatchedReadResponse(ledgerId, entryId, status,
readResponse.getData(),
- INVALID_ENTRY_ID, -1L);
- }
-
- @Override
- public void handleV3Response(Response response) {
- // V3 protocol haven't supported batched read yet.
- }
-
- private void handleBatchedReadResponse(long ledgerId,
- long entryId,
- StatusCode status,
- ByteBufList buffers,
- long maxLAC, // max known lac
piggy-back from bookies
- long lacUpdateTimestamp) { // the
timestamp when the lac is updated.
- int rc = convertStatus(status, BKException.Code.ReadException);
-
- if (maxLAC > INVALID_ENTRY_ID && (ctx instanceof
ReadEntryCallbackCtx)) {
- ((ReadEntryCallbackCtx) ctx).setLastAddConfirmed(maxLAC);
- }
- if (lacUpdateTimestamp > -1L && (ctx instanceof
ReadLastConfirmedAndEntryContext)) {
- ((ReadLastConfirmedAndEntryContext)
ctx).setLacUpdateTimestamp(lacUpdateTimestamp);
- }
- cb.readEntriesComplete(rc, ledgerId, entryId, buffers, ctx);
- }
- }
-
- class StartTLSCompletion extends CompletionValue {
- final StartTLSCallback cb;
-
- public StartTLSCompletion(final CompletionKey key) {
- super("StartTLS", null, -1, -1,
- startTLSOpLogger, startTLSTimeoutOpLogger);
- this.cb = new StartTLSCallback() {
- @Override
- public void startTLSComplete(int rc, Object ctx) {
- logOpResult(rc);
- key.release();
- }
- };
- }
-
- @Override
- public void errorOut() {
- errorOut(BKException.Code.BookieHandleNotAvailableException);
- }
-
- @Override
- public void errorOut(final int rc) {
- failTLS(rc);
- }
-
- @Override
- public void handleV3Response(BookkeeperProtocol.Response response) {
- StatusCode status = response.getStatus();
-
- if (LOG.isDebugEnabled()) {
- logResponse(status);
- }
-
- int rc = convertStatus(status, BKException.Code.SecurityException);
-
- // Cancel START_TLS request timeout
- cb.startTLSComplete(rc, null);
-
- if (state != ConnectionState.START_TLS) {
- LOG.error("Connection state changed before TLS response
received");
- failTLS(BKException.Code.BookieHandleNotAvailableException);
- } else if (status != StatusCode.EOK) {
- LOG.error("Client received error {} during TLS negotiation",
status);
- failTLS(BKException.Code.SecurityException);
- } else {
- initTLSHandshake();
- }
- }
-
- }
-
- // visible for testing
- class GetBookieInfoCompletion extends CompletionValue {
- final GetBookieInfoCallback cb;
-
- public GetBookieInfoCompletion(final CompletionKey key,
- final GetBookieInfoCallback
origCallback,
- final Object origCtx) {
- super("GetBookieInfo", origCtx, 0L, 0L,
- getBookieInfoOpLogger, getBookieInfoTimeoutOpLogger);
- this.cb = new GetBookieInfoCallback() {
- @Override
- public void getBookieInfoComplete(int rc, BookieInfo bInfo,
- Object ctx) {
- logOpResult(rc);
- origCallback.getBookieInfoComplete(rc, bInfo, origCtx);
- key.release();
- }
- };
- }
-
- @Override
- boolean maybeTimeout() {
- if (MathUtils.elapsedNanos(startTime) >=
getBookieInfoTimeoutNanos) {
- timeout();
- return true;
- } else {
- return false;
- }
- }
-
- @Override
- public void errorOut() {
- errorOut(BKException.Code.BookieHandleNotAvailableException);
- }
-
- @Override
- public void errorOut(final int rc) {
- errorOutAndRunCallback(
- () -> cb.getBookieInfoComplete(rc, new BookieInfo(), ctx));
- }
-
- @Override
- public void handleV3Response(BookkeeperProtocol.Response response) {
- GetBookieInfoResponse getBookieInfoResponse =
response.getGetBookieInfoResponse();
- StatusCode status = response.getStatus() == StatusCode.EOK
- ? getBookieInfoResponse.getStatus() : response.getStatus();
-
- long freeDiskSpace = getBookieInfoResponse.getFreeDiskSpace();
- long totalDiskSpace = getBookieInfoResponse.getTotalDiskCapacity();
-
- if (LOG.isDebugEnabled()) {
- logResponse(status, "freeDisk", freeDiskSpace, "totalDisk",
totalDiskSpace);
- }
-
- int rc = convertStatus(status, BKException.Code.ReadException);
- cb.getBookieInfoComplete(rc,
- new BookieInfo(totalDiskSpace,
- freeDiskSpace), ctx);
- }
- }
-
- class GetListOfEntriesOfLedgerCompletion extends CompletionValue {
- final GetListOfEntriesOfLedgerCallback cb;
-
- public GetListOfEntriesOfLedgerCompletion(final CompletionKey key,
- final GetListOfEntriesOfLedgerCallback origCallback, final
long ledgerId) {
- super("GetListOfEntriesOfLedger", null, ledgerId, 0L,
getListOfEntriesOfLedgerCompletionOpLogger,
- getListOfEntriesOfLedgerCompletionTimeoutOpLogger);
- this.cb = new GetListOfEntriesOfLedgerCallback() {
- @Override
- public void getListOfEntriesOfLedgerComplete(int rc, long
ledgerId,
- AvailabilityOfEntriesOfLedger
availabilityOfEntriesOfLedger) {
- logOpResult(rc);
- origCallback.getListOfEntriesOfLedgerComplete(rc,
ledgerId, availabilityOfEntriesOfLedger);
- key.release();
- }
- };
- }
-
- @Override
- public void errorOut() {
- errorOut(BKException.Code.BookieHandleNotAvailableException);
- }
-
- @Override
- public void errorOut(final int rc) {
- errorOutAndRunCallback(() ->
cb.getListOfEntriesOfLedgerComplete(rc, ledgerId, null));
- }
-
- @Override
- public void handleV3Response(BookkeeperProtocol.Response response) {
- GetListOfEntriesOfLedgerResponse getListOfEntriesOfLedgerResponse
= response
- .getGetListOfEntriesOfLedgerResponse();
- ByteBuf availabilityOfEntriesOfLedgerBuffer =
Unpooled.EMPTY_BUFFER;
- StatusCode status = response.getStatus() == StatusCode.EOK ?
getListOfEntriesOfLedgerResponse.getStatus()
- : response.getStatus();
-
- if
(getListOfEntriesOfLedgerResponse.hasAvailabilityOfEntriesOfLedger()) {
- availabilityOfEntriesOfLedgerBuffer = Unpooled.wrappedBuffer(
-
getListOfEntriesOfLedgerResponse.getAvailabilityOfEntriesOfLedger().asReadOnlyByteBuffer());
- }
-
- if (LOG.isDebugEnabled()) {
- logResponse(status, "ledgerId", ledgerId);
- }
-
- int rc = convertStatus(status, BKException.Code.ReadException);
- AvailabilityOfEntriesOfLedger availabilityOfEntriesOfLedger = null;
- if (rc == BKException.Code.OK) {
- availabilityOfEntriesOfLedger = new
AvailabilityOfEntriesOfLedger(
- availabilityOfEntriesOfLedgerBuffer.slice());
- }
- cb.getListOfEntriesOfLedgerComplete(rc, ledgerId,
availabilityOfEntriesOfLedger);
- }
- }
-
- private final Recycler<AddCompletion> addCompletionRecycler = new
Recycler<AddCompletion>() {
- @Override
- protected AddCompletion newObject(Recycler.Handle<AddCompletion>
handle) {
- return new AddCompletion(handle);
- }
- };
-
- AddCompletion acquireAddCompletion(final CompletionKey key,
- final WriteCallback originalCallback,
- final Object originalCtx,
- final long ledgerId, final long
entryId) {
- AddCompletion completion = addCompletionRecycler.get();
- completion.reset(key, originalCallback, originalCtx, ledgerId,
entryId);
- return completion;
- }
-
- // visible for testing
- class AddCompletion extends CompletionValue implements WriteCallback {
- final Recycler.Handle<AddCompletion> handle;
-
- CompletionKey key = null;
- WriteCallback originalCallback = null;
-
- AddCompletion(Recycler.Handle<AddCompletion> handle) {
- super("Add", null, -1, -1, addEntryOpLogger, addTimeoutOpLogger);
- this.handle = handle;
- }
-
- void reset(final CompletionKey key,
- final WriteCallback originalCallback,
- final Object originalCtx,
- final long ledgerId, final long entryId) {
- this.key = key;
- this.originalCallback = originalCallback;
- this.ctx = originalCtx;
- this.ledgerId = ledgerId;
- this.entryId = entryId;
- this.startTime = MathUtils.nowInNano();
- }
-
- @Override
- public void writeComplete(int rc, long ledgerId, long entryId,
- BookieId addr,
- Object ctx) {
- logOpResult(rc);
- originalCallback.writeComplete(rc, ledgerId, entryId, addr, ctx);
- key.release();
- handle.recycle(this);
- }
-
- @Override
- boolean maybeTimeout() {
- if (MathUtils.elapsedNanos(startTime) >= addEntryTimeoutNanos) {
- timeout();
- return true;
- } else {
- return false;
- }
- }
-
- @Override
- public void errorOut() {
- errorOut(BKException.Code.BookieHandleNotAvailableException);
- }
-
- @Override
- public void errorOut(final int rc) {
- errorOutAndRunCallback(
- () -> writeComplete(rc, ledgerId, entryId, bookieId, ctx));
- }
-
- @Override
- public void setOutstanding() {
- addEntryOutstanding.inc();
- }
-
- @Override
- public void handleV2Response(
- long ledgerId, long entryId, StatusCode status,
- BookieProtocol.Response response) {
- addEntryOutstanding.dec();
- handleResponse(ledgerId, entryId, status);
- }
-
- @Override
- public void handleV3Response(
- BookkeeperProtocol.Response response) {
- addEntryOutstanding.dec();
- AddResponse addResponse = response.getAddResponse();
- StatusCode status = response.getStatus() == StatusCode.EOK
- ? addResponse.getStatus() : response.getStatus();
- handleResponse(addResponse.getLedgerId(), addResponse.getEntryId(),
- status);
- }
-
- private void handleResponse(long ledgerId, long entryId,
- StatusCode status) {
- if (LOG.isDebugEnabled()) {
- logResponse(status, "ledger", ledgerId, "entry", entryId);
- }
-
- int rc = convertStatus(status, BKException.Code.WriteException);
- writeComplete(rc, ledgerId, entryId, bookieId, ctx);
- }
- }
-
// visable for testing
CompletionKey newCompletionKey(long txnId, OperationType operationType) {
return new TxnCompletionKey(txnId, operationType);
}
- class TxnCompletionKey extends CompletionKey {
- final long txnId;
-
- public TxnCompletionKey(long txnId, OperationType operationType) {
- super(operationType);
- this.txnId = txnId;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof TxnCompletionKey)) {
- return false;
- }
- TxnCompletionKey that = (TxnCompletionKey) obj;
- return this.txnId == that.txnId && this.operationType ==
that.operationType;
- }
-
- @Override
- public int hashCode() {
- return ((int) txnId);
- }
-
- @Override
- public String toString() {
- return String.format("TxnId(%d), OperationType(%s)", txnId,
operationType);
- }
-
- }
-
- abstract class CompletionKey {
- OperationType operationType;
-
- CompletionKey(OperationType operationType) {
- this.operationType = operationType;
- }
-
- public void release() {}
- }
-
- /**
- * Note : Helper functions follow
- */
-
- /**
- * @param status
- * @return {@link BKException.Code.UNINITIALIZED} if the statuscode is
unknown.
- */
- private int statusCodeToExceptionCode(StatusCode status) {
- switch (status) {
- case EOK:
- return BKException.Code.OK;
- case ENOENTRY:
- return BKException.Code.NoSuchEntryException;
- case ENOLEDGER:
- return BKException.Code.NoSuchLedgerExistsException;
- case EBADVERSION:
- return BKException.Code.ProtocolVersionException;
- case EUA:
- return BKException.Code.UnauthorizedAccessException;
- case EFENCED:
- return BKException.Code.LedgerFencedException;
- case EREADONLY:
- return BKException.Code.WriteOnReadOnlyBookieException;
- case ETOOMANYREQUESTS:
- return BKException.Code.TooManyRequestsException;
- case EUNKNOWNLEDGERSTATE:
- return BKException.Code.DataUnknownException;
- default:
- return BKException.Code.UNINITIALIZED;
- }
- }
-
private void putCompletionKeyValue(CompletionKey key, CompletionValue
value) {
CompletionValue existingValue = completionObjects.putIfAbsent(key,
value);
if (existingValue != null) { // will only happen for V2 keys, as V3
have unique txnid
@@ -2480,64 +1663,6 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
return txnIdGenerator.incrementAndGet();
}
- private final Recycler<EntryCompletionKey> v2KeyRecycler = new
Recycler<EntryCompletionKey>() {
- @Override
- protected EntryCompletionKey newObject(
- Recycler.Handle<EntryCompletionKey> handle) {
- return new EntryCompletionKey(handle);
- }
- };
-
- EntryCompletionKey acquireV2Key(long ledgerId, long entryId,
- OperationType operationType) {
- EntryCompletionKey key = v2KeyRecycler.get();
- key.reset(ledgerId, entryId, operationType);
- return key;
- }
-
- private class EntryCompletionKey extends CompletionKey {
- private final Handle<EntryCompletionKey> recyclerHandle;
- long ledgerId;
- long entryId;
-
- private EntryCompletionKey(Handle<EntryCompletionKey> handle) {
- super(null);
- this.recyclerHandle = handle;
- }
-
- void reset(long ledgerId, long entryId, OperationType operationType) {
- this.ledgerId = ledgerId;
- this.entryId = entryId;
- this.operationType = operationType;
- }
-
- @Override
- public boolean equals(Object object) {
- if (!(object instanceof EntryCompletionKey)) {
- return false;
- }
- EntryCompletionKey that = (EntryCompletionKey) object;
- return this.entryId == that.entryId
- && this.ledgerId == that.ledgerId
- && this.operationType == that.operationType;
- }
-
- @Override
- public int hashCode() {
- return Long.hashCode(ledgerId) * 31 + Long.hashCode(entryId);
- }
-
- @Override
- public String toString() {
- return String.format("%d:%d %s", ledgerId, entryId, operationType);
- }
-
- @Override
- public void release() {
- recyclerHandle.recycle(this);
- }
- }
-
Request.Builder withRequestContext(Request.Builder builder) {
if (preserveMdcForTaskExecution) {
return appendRequestContext(builder);
@@ -2706,7 +1831,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
final long txnId = getTxnId();
final CompletionKey completionKey = new TxnCompletionKey(txnId,
OperationType.START_TLS);
completionObjects.put(completionKey,
- new StartTLSCompletion(completionKey));
+ new StartTLSCompletion(completionKey, this));
BookkeeperProtocol.Request.Builder h =
withRequestContext(BookkeeperProtocol.Request.newBuilder());
BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
.setVersion(ProtocolVersion.VERSION_THREE)
@@ -2718,7 +1843,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
writeAndFlush(channel, completionKey, h.build());
}
- private void failTLS(int rc) {
+ protected void failTLS(int rc) {
LOG.error("TLS failure on: {}, rc: {}", channel, rc);
Queue<GenericCallback<PerChannelBookieClient>> oldPendingOps;
synchronized (this) {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadCompletion.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadCompletion.java
new file mode 100644
index 0000000000..d9639397b1
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadCompletion.java
@@ -0,0 +1,127 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.proto;
+
+import static org.apache.bookkeeper.client.LedgerHandle.INVALID_ENTRY_ID;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.util.ReferenceCountUtil;
+import org.apache.bookkeeper.client.BKException;
+
+class ReadCompletion extends CompletionValue {
+ final BookkeeperInternalCallbacks.ReadEntryCallback cb;
+
+ public ReadCompletion(final CompletionKey key,
+ final BookkeeperInternalCallbacks.ReadEntryCallback
originalCallback,
+ final Object originalCtx,
+ long ledgerId, final long entryId,
+ PerChannelBookieClient perChannelBookieClient) {
+ super("Read", originalCtx, ledgerId, entryId, perChannelBookieClient);
+ this.opLogger = perChannelBookieClient.readEntryOpLogger;
+ this.timeoutOpLogger = perChannelBookieClient.readTimeoutOpLogger;
+ this.cb = (rc, ledgerId1, entryId1, buffer, ctx) -> {
+ logOpResult(rc);
+ originalCallback.readEntryComplete(rc,
+ ledgerId1, entryId1,
+ buffer, originalCtx);
+ key.release();
+ };
+ }
+
+ @Override
+ public void errorOut() {
+ errorOut(BKException.Code.BookieHandleNotAvailableException);
+ }
+
+ @Override
+ public void errorOut(final int rc) {
+ errorOutAndRunCallback(
+ () -> cb.readEntryComplete(rc, ledgerId,
+ entryId, null, ctx));
+ }
+
+ @Override
+ public void setOutstanding() {
+ perChannelBookieClient.readEntryOutstanding.inc();
+ }
+
+ @Override
+ public void handleV2Response(long ledgerId, long entryId,
+ BookkeeperProtocol.StatusCode status,
+ BookieProtocol.Response response) {
+ perChannelBookieClient.readEntryOutstanding.dec();
+ if (!(response instanceof BookieProtocol.ReadResponse)) {
+ return;
+ }
+ BookieProtocol.ReadResponse readResponse =
(BookieProtocol.ReadResponse) response;
+ handleReadResponse(ledgerId, entryId, status, readResponse.getData(),
+ INVALID_ENTRY_ID, -1L);
+ }
+
+ @Override
+ public void handleV3Response(BookkeeperProtocol.Response response) {
+ perChannelBookieClient.readEntryOutstanding.dec();
+ BookkeeperProtocol.ReadResponse readResponse =
response.getReadResponse();
+ BookkeeperProtocol.StatusCode status = response.getStatus() ==
BookkeeperProtocol.StatusCode.EOK
+ ? readResponse.getStatus() : response.getStatus();
+ ByteBuf buffer = Unpooled.EMPTY_BUFFER;
+ if (readResponse.hasBody()) {
+ buffer =
Unpooled.wrappedBuffer(readResponse.getBody().asReadOnlyByteBuffer());
+ }
+ long maxLAC = INVALID_ENTRY_ID;
+ if (readResponse.hasMaxLAC()) {
+ maxLAC = readResponse.getMaxLAC();
+ }
+ long lacUpdateTimestamp = -1L;
+ if (readResponse.hasLacUpdateTimestamp()) {
+ lacUpdateTimestamp = readResponse.getLacUpdateTimestamp();
+ }
+ handleReadResponse(readResponse.getLedgerId(),
+ readResponse.getEntryId(),
+ status, buffer, maxLAC, lacUpdateTimestamp);
+ ReferenceCountUtil.release(
+ buffer); // meaningless using unpooled, but client may expect
to hold the last reference
+ }
+
+ private void handleReadResponse(long ledgerId,
+ long entryId,
+ BookkeeperProtocol.StatusCode status,
+ ByteBuf buffer,
+ long maxLAC, // max known lac piggy-back
from bookies
+ long lacUpdateTimestamp) { // the
timestamp when the lac is updated.
+ int readableBytes = buffer.readableBytes();
+ if (LOG.isDebugEnabled()) {
+ logResponse(status, "ledger", ledgerId, "entry", entryId,
"entryLength", readableBytes);
+ }
+
+ int rc = convertStatus(status, BKException.Code.ReadException);
+
+ if (maxLAC > INVALID_ENTRY_ID && (ctx instanceof
BookkeeperInternalCallbacks.ReadEntryCallbackCtx)) {
+ ((BookkeeperInternalCallbacks.ReadEntryCallbackCtx)
ctx).setLastAddConfirmed(maxLAC);
+ }
+ if (lacUpdateTimestamp > -1L && (ctx instanceof
ReadLastConfirmedAndEntryContext)) {
+ ((ReadLastConfirmedAndEntryContext)
ctx).setLacUpdateTimestamp(lacUpdateTimestamp);
+ }
+ cb.readEntryComplete(rc, ledgerId, entryId, buffer.slice(), ctx);
+ }
+}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacCompletion.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacCompletion.java
new file mode 100644
index 0000000000..a5eadbf49a
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacCompletion.java
@@ -0,0 +1,87 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.proto;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.bookkeeper.client.BKException;
+
+class ReadLacCompletion extends CompletionValue {
+ final BookkeeperInternalCallbacks.ReadLacCallback cb;
+
+ public ReadLacCompletion(final CompletionKey key,
+ BookkeeperInternalCallbacks.ReadLacCallback
originalCallback,
+ final Object ctx, final long ledgerId,
+ PerChannelBookieClient perChannelBookieClient) {
+ super("ReadLAC", ctx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED,
perChannelBookieClient);
+ this.opLogger = perChannelBookieClient.readLacOpLogger;
+ this.timeoutOpLogger = perChannelBookieClient.readLacTimeoutOpLogger;
+ this.cb = new BookkeeperInternalCallbacks.ReadLacCallback() {
+ @Override
+ public void readLacComplete(int rc, long ledgerId,
+ ByteBuf lacBuffer,
+ ByteBuf lastEntryBuffer,
+ Object ctx) {
+ logOpResult(rc);
+ originalCallback.readLacComplete(
+ rc, ledgerId, lacBuffer, lastEntryBuffer, ctx);
+ key.release();
+ }
+ };
+ }
+
+ @Override
+ public void errorOut() {
+ errorOut(BKException.Code.BookieHandleNotAvailableException);
+ }
+
+ @Override
+ public void errorOut(final int rc) {
+ errorOutAndRunCallback(
+ () -> cb.readLacComplete(rc, ledgerId, null, null, ctx));
+ }
+
+ @Override
+ public void handleV3Response(BookkeeperProtocol.Response response) {
+ BookkeeperProtocol.ReadLacResponse readLacResponse =
response.getReadLacResponse();
+ ByteBuf lacBuffer = Unpooled.EMPTY_BUFFER;
+ ByteBuf lastEntryBuffer = Unpooled.EMPTY_BUFFER;
+ BookkeeperProtocol.StatusCode status = response.getStatus() ==
BookkeeperProtocol.StatusCode.EOK
+ ? readLacResponse.getStatus() : response.getStatus();
+
+ if (readLacResponse.hasLacBody()) {
+ lacBuffer =
Unpooled.wrappedBuffer(readLacResponse.getLacBody().asReadOnlyByteBuffer());
+ }
+
+ if (readLacResponse.hasLastEntryBody()) {
+ lastEntryBuffer =
Unpooled.wrappedBuffer(readLacResponse.getLastEntryBody().asReadOnlyByteBuffer());
+ }
+
+ if (LOG.isDebugEnabled()) {
+ logResponse(status, "ledgerId", ledgerId);
+ }
+
+ int rc = convertStatus(status, BKException.Code.ReadException);
+ cb.readLacComplete(rc, ledgerId, lacBuffer.slice(),
+ lastEntryBuffer.slice(), ctx);
+ }
+}
\ No newline at end of file
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/StartTLSCompletion.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/StartTLSCompletion.java
new file mode 100644
index 0000000000..71491ed467
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/StartTLSCompletion.java
@@ -0,0 +1,75 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.proto;
+
+import org.apache.bookkeeper.client.BKException;
+
+class StartTLSCompletion extends CompletionValue {
+ final BookkeeperInternalCallbacks.StartTLSCallback cb;
+
+ public StartTLSCompletion(final CompletionKey key, PerChannelBookieClient
perChannelBookieClient) {
+ super("StartTLS", null, -1, -1, perChannelBookieClient);
+ this.opLogger = perChannelBookieClient.startTLSOpLogger;
+ this.timeoutOpLogger = perChannelBookieClient.startTLSTimeoutOpLogger;
+ this.cb = new BookkeeperInternalCallbacks.StartTLSCallback() {
+ @Override
+ public void startTLSComplete(int rc, Object ctx) {
+ logOpResult(rc);
+ key.release();
+ }
+ };
+ }
+
+ @Override
+ public void errorOut() {
+ errorOut(BKException.Code.BookieHandleNotAvailableException);
+ }
+
+ @Override
+ public void errorOut(final int rc) {
+ perChannelBookieClient.failTLS(rc);
+ }
+
+ @Override
+ public void handleV3Response(BookkeeperProtocol.Response response) {
+ BookkeeperProtocol.StatusCode status = response.getStatus();
+
+ if (LOG.isDebugEnabled()) {
+ logResponse(status);
+ }
+
+ int rc = convertStatus(status, BKException.Code.SecurityException);
+
+ // Cancel START_TLS request timeout
+ cb.startTLSComplete(rc, null);
+
+ if (perChannelBookieClient.state !=
PerChannelBookieClient.ConnectionState.START_TLS) {
+ LOG.error("Connection state changed before TLS response received");
+
perChannelBookieClient.failTLS(BKException.Code.BookieHandleNotAvailableException);
+ } else if (status != BookkeeperProtocol.StatusCode.EOK) {
+ LOG.error("Client received error {} during TLS negotiation",
status);
+ perChannelBookieClient.failTLS(BKException.Code.SecurityException);
+ } else {
+ perChannelBookieClient.initTLSHandshake();
+ }
+ }
+}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/TxnCompletionKey.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/TxnCompletionKey.java
new file mode 100644
index 0000000000..bbf64681e1
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/TxnCompletionKey.java
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.proto;
+
+class TxnCompletionKey extends CompletionKey {
+ final long txnId;
+
+ public TxnCompletionKey(long txnId, BookkeeperProtocol.OperationType
operationType) {
+ super(operationType);
+ this.txnId = txnId;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof TxnCompletionKey)) {
+ return false;
+ }
+ TxnCompletionKey that = (TxnCompletionKey) obj;
+ return this.txnId == that.txnId && this.operationType ==
that.operationType;
+ }
+
+ @Override
+ public int hashCode() {
+ return ((int) txnId);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("TxnId(%d), OperationType(%s)", txnId,
operationType);
+ }
+
+}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacCompletion.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacCompletion.java
new file mode 100644
index 0000000000..9d4ca0c39a
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacCompletion.java
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.proto;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.net.BookieId;
+
+class WriteLacCompletion extends CompletionValue {
+ final BookkeeperInternalCallbacks.WriteLacCallback cb;
+
+ public WriteLacCompletion(final CompletionKey key,
+ final
BookkeeperInternalCallbacks.WriteLacCallback originalCallback,
+ final Object originalCtx,
+ final long ledgerId,
+ PerChannelBookieClient perChannelBookieClient) {
+ super("WriteLAC",
+ originalCtx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED,
perChannelBookieClient);
+ this.opLogger = perChannelBookieClient.writeLacOpLogger;
+ this.timeoutOpLogger = perChannelBookieClient.writeLacTimeoutOpLogger;
+ this.cb = new BookkeeperInternalCallbacks.WriteLacCallback() {
+ @Override
+ public void writeLacComplete(int rc, long ledgerId,
+ BookieId addr,
+ Object ctx) {
+ logOpResult(rc);
+ originalCallback.writeLacComplete(rc, ledgerId,
+ addr, originalCtx);
+ key.release();
+ }
+ };
+ }
+
+ @Override
+ public void errorOut() {
+ errorOut(BKException.Code.BookieHandleNotAvailableException);
+ }
+
+ @Override
+ public void errorOut(final int rc) {
+ errorOutAndRunCallback(
+ () -> cb.writeLacComplete(rc, ledgerId,
perChannelBookieClient.bookieId, ctx));
+ }
+
+ @Override
+ public void handleV3Response(BookkeeperProtocol.Response response) {
+ BookkeeperProtocol.WriteLacResponse writeLacResponse =
response.getWriteLacResponse();
+ BookkeeperProtocol.StatusCode status = response.getStatus() ==
BookkeeperProtocol.StatusCode.EOK
+ ? writeLacResponse.getStatus() : response.getStatus();
+ long ledgerId = writeLacResponse.getLedgerId();
+
+ if (LOG.isDebugEnabled()) {
+ logResponse(status, "ledger", ledgerId);
+ }
+ int rc = convertStatus(status, BKException.Code.WriteException);
+ cb.writeLacComplete(rc, ledgerId, perChannelBookieClient.bookieId,
ctx);
+ }
+}