This is an automated email from the ASF dual-hosted git repository.
chenhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new e1d72cf4ec [BP-62] Refactor read op, and introduce batchReadOp. (#4190)
e1d72cf4ec is described below
commit e1d72cf4ece2ac15e88cd025aa347125207ed8cc
Author: Yan Zhao <[email protected]>
AuthorDate: Mon Feb 5 09:42:39 2024 +0800
[BP-62] Refactor read op, and introduce batchReadOp. (#4190)
### Motivation
This is the fourth PR for the batch
read(https://github.com/apache/bookkeeper/pull/4051) feature.
Refactor read op, extract ReadOpBase. Introduce batchedReadOp.
---
.../apache/bookkeeper/client/BatchedReadOp.java | 321 +++++++++++++
.../client/ListenerBasedPendingReadOp.java | 2 +-
.../apache/bookkeeper/client/PendingReadOp.java | 526 ++++++---------------
.../org/apache/bookkeeper/client/ReadOpBase.java | 293 ++++++++++++
.../apache/bookkeeper/client/TestParallelRead.java | 17 +-
5 files changed, 760 insertions(+), 399 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java
new file mode 100644
index 0000000000..4892882e1d
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java
@@ -0,0 +1,321 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import io.netty.buffer.ByteBuf;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
+import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.proto.BookieProtocol;
+import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.BatchedReadEntryCallback;
+import org.apache.bookkeeper.proto.checksum.DigestManager;
+import org.apache.bookkeeper.util.ByteBufList;
+import org.apache.bookkeeper.util.MathUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BatchedReadOp extends ReadOpBase implements
BatchedReadEntryCallback {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(BatchedReadOp.class);
+
+ final int maxCount;
+ final long maxSize;
+
+ BatchedLedgerEntryRequest request;
+
+ BatchedReadOp(LedgerHandle lh,
+ ClientContext clientCtx,
+ long startEntryId,
+ int maxCount,
+ long maxSize,
+ boolean isRecoveryRead) {
+ super(lh, clientCtx, startEntryId, -1L, isRecoveryRead);
+ this.maxCount = maxCount;
+ this.maxSize = maxSize;
+ }
+
+ @Override
+ void initiate() {
+ this.requestTimeNanos = MathUtils.nowInNano();
+ List<BookieId> ensemble =
getLedgerMetadata().getEnsembleAt(startEntryId);
+ request = new SequenceReadRequest(ensemble, lh.ledgerId, startEntryId,
maxCount, maxSize);
+ request.read();
+ if (clientCtx.getConf().readSpeculativeRequestPolicy.isPresent()) {
+ speculativeTask =
clientCtx.getConf().readSpeculativeRequestPolicy.get()
+ .initiateSpeculativeRequest(clientCtx.getScheduler(),
request);
+ }
+ }
+
+ @Override
+ protected void submitCallback(int code) {
+ // ensure callback once
+ if (!complete.compareAndSet(false, true)) {
+ return;
+ }
+
+ cancelSpeculativeTask(true);
+
+ long latencyNanos = MathUtils.elapsedNanos(requestTimeNanos);
+ if (code != BKException.Code.OK) {
+ LOG.error(
+ "Read of ledger entry failed: L{} E{}-E{}, Sent to {}, "
+ + "Heard from {} : bitset = {}, Error = '{}'.
First unread entry is ({}, rc = {})",
+ lh.getId(), startEntryId, endEntryId, sentToHosts,
heardFromHosts, heardFromHostsBitSet,
+ BKException.getMessage(code), startEntryId, code);
+
clientCtx.getClientStats().getReadOpLogger().registerFailedEvent(latencyNanos,
TimeUnit.NANOSECONDS);
+ // release the entries
+
+ request.close();
+ future.completeExceptionally(BKException.create(code));
+ } else {
+
clientCtx.getClientStats().getReadOpLogger().registerSuccessfulEvent(latencyNanos,
TimeUnit.NANOSECONDS);
+ future.complete(LedgerEntriesImpl.create(request.entries));
+ }
+ }
+
+ @Override
+ public void readEntriesComplete(int rc, long ledgerId, long startEntryId,
ByteBufList bufList, Object ctx) {
+ final ReadContext rctx = (ReadContext) ctx;
+ final BatchedLedgerEntryRequest entry = (BatchedLedgerEntryRequest)
rctx.entry;
+
+ if (rc != BKException.Code.OK) {
+ entry.logErrorAndReattemptRead(rctx.bookieIndex, rctx.to, "Error:
" + BKException.getMessage(rc), rc);
+ return;
+ }
+
+ heardFromHosts.add(rctx.to);
+ heardFromHostsBitSet.set(rctx.bookieIndex, true);
+
+ bufList.retain();
+ // if entry has completed don't handle twice
+ if (entry.complete(rctx.bookieIndex, rctx.to, bufList)) {
+ if (!isRecoveryRead) {
+ // do not advance LastAddConfirmed for recovery reads
+ lh.updateLastConfirmed(rctx.getLastAddConfirmed(), 0L);
+ }
+ submitCallback(BKException.Code.OK);
+ } else {
+ bufList.release();
+ }
+ }
+
+ void sendReadTo(int bookieIndex, BookieId to, BatchedLedgerEntryRequest
entry) throws InterruptedException {
+ if (lh.throttler != null) {
+ lh.throttler.acquire();
+ }
+ if (isRecoveryRead) {
+ int flags = BookieProtocol.FLAG_HIGH_PRIORITY |
BookieProtocol.FLAG_DO_FENCING;
+ clientCtx.getBookieClient().batchReadEntries(to, lh.ledgerId,
entry.eId,
+ maxCount, maxSize, this, new ReadContext(bookieIndex, to,
entry), flags, lh.ledgerKey);
+ } else {
+ clientCtx.getBookieClient().batchReadEntries(to, lh.ledgerId,
entry.eId, maxCount, maxSize,
+ this, new ReadContext(bookieIndex, to, entry),
BookieProtocol.FLAG_NONE);
+ }
+ }
+
+ abstract class BatchedLedgerEntryRequest extends LedgerEntryRequest {
+
+ //Indicate which ledger the BatchedLedgerEntryRequest is reading.
+ final long lId;
+ final int maxCount;
+ final long maxSize;
+
+ final List<LedgerEntry> entries;
+
+ BatchedLedgerEntryRequest(List<BookieId> ensemble, long lId, long eId,
int maxCount, long maxSize) {
+ super(ensemble, eId);
+ this.lId = lId;
+ this.maxCount = maxCount;
+ this.maxSize = maxSize;
+ this.entries = new ArrayList<>(maxCount);
+ }
+
+ boolean complete(int bookieIndex, BookieId host, final ByteBufList
bufList) {
+ if (isComplete()) {
+ return false;
+ }
+ if (!complete.getAndSet(true)) {
+ for (int i = 0; i < bufList.size(); i++) {
+ ByteBuf buffer = bufList.getBuffer(i);
+ ByteBuf content;
+ try {
+ content = lh.macManager.verifyDigestAndReturnData(eId
+ i, buffer);
+ } catch (BKException.BKDigestMatchException e) {
+ clientCtx.getClientStats().getReadOpDmCounter().inc();
+ logErrorAndReattemptRead(bookieIndex, host, "Mac
mismatch",
+ BKException.Code.DigestMatchException);
+ return false;
+ }
+ rc = BKException.Code.OK;
+ /*
+ * The length is a long and it is the last field of the
metadata of an entry.
+ * Consequently, we have to subtract 8 from
METADATA_LENGTH to get the length.
+ */
+ LedgerEntryImpl entryImpl =
LedgerEntryImpl.create(lh.ledgerId, startEntryId + i);
+
entryImpl.setLength(buffer.getLong(DigestManager.METADATA_LENGTH - 8));
+ entryImpl.setEntryBuf(content);
+ entries.add(entryImpl);
+ }
+ writeSet.recycle();
+ return true;
+ } else {
+ writeSet.recycle();
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return String.format("L%d-E%d~%d s-%d", lh.getId(), eId, eId +
maxCount, maxSize);
+ }
+ }
+
+ class SequenceReadRequest extends BatchedLedgerEntryRequest {
+
+ static final int NOT_FOUND = -1;
+ int nextReplicaIndexToReadFrom = 0;
+ final BitSet sentReplicas;
+ final BitSet erroredReplicas;
+ SequenceReadRequest(List<BookieId> ensemble,
+ long lId,
+ long eId,
+ int maxCount,
+ long maxSize) {
+ super(ensemble, lId, eId, maxCount, maxSize);
+ this.sentReplicas = new
BitSet(lh.getLedgerMetadata().getWriteQuorumSize());
+ this.erroredReplicas = new
BitSet(lh.getLedgerMetadata().getWriteQuorumSize());
+ }
+
+ @Override
+ void read() {
+ sendNextRead();
+ }
+
+ private synchronized int getNextReplicaIndexToReadFrom() {
+ return nextReplicaIndexToReadFrom;
+ }
+
+ private BitSet getSentToBitSet() {
+ BitSet b = new BitSet(ensemble.size());
+
+ for (int i = 0; i < sentReplicas.length(); i++) {
+ if (sentReplicas.get(i)) {
+ b.set(writeSet.get(i));
+ }
+ }
+ return b;
+ }
+
+ private boolean readsOutstanding() {
+ return (sentReplicas.cardinality() -
erroredReplicas.cardinality()) > 0;
+ }
+
+ @Override
+ synchronized BookieId maybeSendSpeculativeRead(BitSet heardFrom) {
+ if (nextReplicaIndexToReadFrom >=
getLedgerMetadata().getWriteQuorumSize()) {
+ return null;
+ }
+
+ BitSet sentTo = getSentToBitSet();
+ sentTo.and(heardFrom);
+
+ // only send another read if we have had no successful response at
all
+ // (even for other entries) from any of the other bookies we have
sent the
+ // request to
+ if (sentTo.cardinality() == 0) {
+ clientCtx.getClientStats().getSpeculativeReadCounter().inc();
+ return sendNextRead();
+ } else {
+ return null;
+ }
+ }
+
+ synchronized BookieId sendNextRead() {
+ if (nextReplicaIndexToReadFrom >=
getLedgerMetadata().getWriteQuorumSize()) {
+ // we are done, the read has failed from all replicas, just
fail the
+ // read
+ fail(firstError);
+ return null;
+ }
+
+ // ToDo: pick replica with writable PCBC. ISSUE #1239
+ // https://github.com/apache/bookkeeper/issues/1239
+ int replica = nextReplicaIndexToReadFrom;
+ int bookieIndex = writeSet.get(nextReplicaIndexToReadFrom);
+ nextReplicaIndexToReadFrom++;
+
+ try {
+ BookieId to = ensemble.get(bookieIndex);
+ sendReadTo(bookieIndex, to, this);
+ sentToHosts.add(to);
+ sentReplicas.set(replica);
+ return to;
+ } catch (InterruptedException ie) {
+ LOG.error("Interrupted reading entry " + this, ie);
+ Thread.currentThread().interrupt();
+ fail(BKException.Code.InterruptedException);
+ return null;
+ }
+ }
+
+ @Override
+ synchronized void logErrorAndReattemptRead(int bookieIndex, BookieId
host, String errMsg, int rc) {
+ super.logErrorAndReattemptRead(bookieIndex, host, errMsg, rc);
+ int replica = writeSet.indexOf(bookieIndex);
+ if (replica == NOT_FOUND) {
+ LOG.error("Received error from a host which is not in the
ensemble {} {}.", host, ensemble);
+ return;
+ }
+ erroredReplicas.set(replica);
+ if (isRecoveryRead && (numBookiesMissingEntry >=
requiredBookiesMissingEntryForRecovery)) {
+ /* For recovery, report NoSuchEntry as soon as wQ-aQ+1 bookies
report that they do not
+ * have the entry */
+ fail(BKException.Code.NoSuchEntryException);
+ return;
+ }
+
+ if (!readsOutstanding()) {
+ sendNextRead();
+ }
+ }
+
+ @Override
+ boolean complete(int bookieIndex, BookieId host, final ByteBufList
bufList) {
+ boolean completed = super.complete(bookieIndex, host, bufList);
+ if (completed) {
+ int numReplicasTried = getNextReplicaIndexToReadFrom();
+ // Check if any speculative reads were issued and mark any
slow bookies before
+ // the first successful speculative read as "slow"
+ for (int i = 0; i < numReplicasTried - 1; i++) {
+ int slowBookieIndex = writeSet.get(i);
+ BookieId slowBookieSocketAddress =
ensemble.get(slowBookieIndex);
+
clientCtx.getPlacementPolicy().registerSlowBookie(slowBookieSocketAddress, eId);
+ }
+ }
+ return completed;
+ }
+ }
+}
\ No newline at end of file
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ListenerBasedPendingReadOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ListenerBasedPendingReadOp.java
index 6733b2e9ea..fedb79696a 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ListenerBasedPendingReadOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ListenerBasedPendingReadOp.java
@@ -45,7 +45,7 @@ class ListenerBasedPendingReadOp extends PendingReadOp {
@Override
protected void submitCallback(int code) {
- LedgerEntryRequest request;
+ SingleLedgerEntryRequest request;
while (!seq.isEmpty() && (request = seq.getFirst()) != null) {
if (!request.isComplete()) {
return;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index 73715859c0..15d48c6435 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -21,27 +21,16 @@
package org.apache.bookkeeper.client;
import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ListenableFuture;
import io.netty.buffer.ByteBuf;
import java.util.BitSet;
-import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
-import org.apache.bookkeeper.client.api.LedgerEntries;
-import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.proto.BookieProtocol;
import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
-import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallbackCtx;
import org.apache.bookkeeper.proto.checksum.DigestManager;
import org.apache.bookkeeper.util.MathUtils;
import org.slf4j.Logger;
@@ -54,85 +43,165 @@ import org.slf4j.LoggerFactory;
* application as soon as it arrives rather than waiting for the whole thing.
*
*/
-class PendingReadOp implements ReadEntryCallback, Runnable {
+class PendingReadOp extends ReadOpBase implements ReadEntryCallback {
private static final Logger LOG =
LoggerFactory.getLogger(PendingReadOp.class);
- private ScheduledFuture<?> speculativeTask = null;
- protected final LinkedList<LedgerEntryRequest> seq;
- private final CompletableFuture<LedgerEntries> future;
- private final Set<BookieId> heardFromHosts;
- private final BitSet heardFromHostsBitSet;
- private final Set<BookieId> sentToHosts = new HashSet<BookieId>();
- LedgerHandle lh;
- final ClientContext clientCtx;
+ protected boolean parallelRead = false;
+ protected final LinkedList<SingleLedgerEntryRequest> seq;
- long numPendingEntries;
- final long startEntryId;
- final long endEntryId;
- long requestTimeNanos;
+ PendingReadOp(LedgerHandle lh,
+ ClientContext clientCtx,
+ long startEntryId,
+ long endEntryId,
+ boolean isRecoveryRead) {
+ super(lh, clientCtx, startEntryId, endEntryId, isRecoveryRead);
+ this.seq = new LinkedList<>();
+ numPendingEntries = endEntryId - startEntryId + 1;
+ }
+
+ PendingReadOp parallelRead(boolean enabled) {
+ this.parallelRead = enabled;
+ return this;
+ }
+
+ void initiate() {
+ long nextEnsembleChange = startEntryId, i = startEntryId;
+ this.requestTimeNanos = MathUtils.nowInNano();
+ List<BookieId> ensemble = null;
+ do {
+ if (i == nextEnsembleChange) {
+ ensemble = getLedgerMetadata().getEnsembleAt(i);
+ nextEnsembleChange =
LedgerMetadataUtils.getNextEnsembleChange(getLedgerMetadata(), i);
+ }
+ SingleLedgerEntryRequest entry;
+ if (parallelRead) {
+ entry = new ParallelReadRequest(ensemble, lh.ledgerId, i);
+ } else {
+ entry = new SequenceReadRequest(ensemble, lh.ledgerId, i);
+ }
+ seq.add(entry);
+ i++;
+ } while (i <= endEntryId);
+ // read the entries.
+ for (LedgerEntryRequest entry : seq) {
+ entry.read();
+ if (!parallelRead &&
clientCtx.getConf().readSpeculativeRequestPolicy.isPresent()) {
+ speculativeTask =
clientCtx.getConf().readSpeculativeRequestPolicy.get()
+ .initiateSpeculativeRequest(clientCtx.getScheduler(),
entry);
+ }
+ }
+ }
- final int requiredBookiesMissingEntryForRecovery;
- final boolean isRecoveryRead;
+ @Override
+ public void readEntryComplete(int rc, long ledgerId, final long entryId,
final ByteBuf buffer, Object ctx) {
+ final ReadContext rctx = (ReadContext) ctx;
+ final SingleLedgerEntryRequest entry = (SingleLedgerEntryRequest)
rctx.entry;
+
+ if (rc != BKException.Code.OK) {
+ entry.logErrorAndReattemptRead(rctx.bookieIndex, rctx.to, "Error:
" + BKException.getMessage(rc), rc);
+ return;
+ }
+
+ heardFromHosts.add(rctx.to);
+ heardFromHostsBitSet.set(rctx.bookieIndex, true);
+
+ buffer.retain();
+ // if entry has completed don't handle twice
+ if (entry.complete(rctx.bookieIndex, rctx.to, buffer)) {
+ if (!isRecoveryRead) {
+ // do not advance LastAddConfirmed for recovery reads
+ lh.updateLastConfirmed(rctx.getLastAddConfirmed(), 0L);
+ }
+ submitCallback(BKException.Code.OK);
+ } else {
+ buffer.release();
+ }
+
+ if (numPendingEntries < 0) {
+ LOG.error("Read too many values for ledger {} : [{}, {}].",
+ ledgerId, startEntryId, endEntryId);
+ }
+
+ }
+
+ protected void submitCallback(int code) {
+ if (BKException.Code.OK == code) {
+ numPendingEntries--;
+ if (numPendingEntries != 0) {
+ return;
+ }
+ }
+
+ // ensure callback once
+ if (!complete.compareAndSet(false, true)) {
+ return;
+ }
- boolean parallelRead = false;
- final AtomicBoolean complete = new AtomicBoolean(false);
- boolean allowFailFast = false;
+ cancelSpeculativeTask(true);
- abstract class LedgerEntryRequest implements SpeculativeRequestExecutor,
AutoCloseable {
+ long latencyNanos = MathUtils.elapsedNanos(requestTimeNanos);
+ if (code != BKException.Code.OK) {
+ long firstUnread = LedgerHandle.INVALID_ENTRY_ID;
+ Integer firstRc = null;
+ for (LedgerEntryRequest req : seq) {
+ if (!req.isComplete()) {
+ firstUnread = req.eId;
+ firstRc = req.rc;
+ break;
+ }
+ }
+ LOG.error(
+ "Read of ledger entry failed: L{} E{}-E{}, Sent to {}, "
+ + "Heard from {} : bitset = {}, Error = '{}'.
First unread entry is ({}, rc = {})",
+ lh.getId(), startEntryId, endEntryId, sentToHosts,
heardFromHosts, heardFromHostsBitSet,
+ BKException.getMessage(code), firstUnread, firstRc);
+
clientCtx.getClientStats().getReadOpLogger().registerFailedEvent(latencyNanos,
TimeUnit.NANOSECONDS);
+ // release the entries
+ seq.forEach(LedgerEntryRequest::close);
+ future.completeExceptionally(BKException.create(code));
+ } else {
+
clientCtx.getClientStats().getReadOpLogger().registerSuccessfulEvent(latencyNanos,
TimeUnit.NANOSECONDS);
+ future.complete(LedgerEntriesImpl.create(Lists.transform(seq,
input -> input.entryImpl)));
+ }
+ }
- final AtomicBoolean complete = new AtomicBoolean(false);
+ void sendReadTo(int bookieIndex, BookieId to, SingleLedgerEntryRequest
entry) throws InterruptedException {
+ if (lh.throttler != null) {
+ lh.throttler.acquire();
+ }
- int rc = BKException.Code.OK;
- int firstError = BKException.Code.OK;
- int numBookiesMissingEntry = 0;
+ if (isRecoveryRead) {
+ int flags = BookieProtocol.FLAG_HIGH_PRIORITY |
BookieProtocol.FLAG_DO_FENCING;
+ clientCtx.getBookieClient().readEntry(to, lh.ledgerId, entry.eId,
+ this, new ReadContext(bookieIndex, to, entry), flags,
lh.ledgerKey);
+ } else {
+ clientCtx.getBookieClient().readEntry(to, lh.ledgerId, entry.eId,
+ this, new ReadContext(bookieIndex, to, entry),
BookieProtocol.FLAG_NONE);
+ }
+ }
- final List<BookieId> ensemble;
- final DistributionSchedule.WriteSet writeSet;
+ abstract class SingleLedgerEntryRequest extends LedgerEntryRequest {
final LedgerEntryImpl entryImpl;
- final long eId;
- LedgerEntryRequest(List<BookieId> ensemble, long lId, long eId) {
+ SingleLedgerEntryRequest(List<BookieId> ensemble, long lId, long eId) {
+ super(ensemble, eId);
this.entryImpl = LedgerEntryImpl.create(lId, eId);
- this.ensemble = ensemble;
- this.eId = eId;
-
- if (clientCtx.getConf().enableReorderReadSequence) {
- writeSet = clientCtx.getPlacementPolicy()
- .reorderReadSequence(
- ensemble,
- lh.getBookiesHealthInfo(),
- lh.getWriteSetForReadOperation(eId));
- } else {
- writeSet = lh.getWriteSetForReadOperation(eId);
- }
}
@Override
public void close() {
- // this request has succeeded before, can't recycle writeSet again
- if (complete.compareAndSet(false, true)) {
- rc = BKException.Code.UnexpectedConditionException;
- writeSet.recycle();
- }
+ super.close();
entryImpl.close();
}
- /**
- * Execute the read request.
- */
- abstract void read();
-
/**
* Complete the read request from <i>host</i>.
*
- * @param bookieIndex
- * bookie index
- * @param host
- * host that respond the read
- * @param buffer
- * the data buffer
+ * @param bookieIndex bookie index
+ * @param host host that respond the read
+ * @param buffer the data buffer
* @return return true if we managed to complete the entry;
- * otherwise return false if the read entry is not complete or
it is already completed before
+ * otherwise return false if the read entry is not complete or it is
already completed before
*/
boolean complete(int bookieIndex, BookieId host, final ByteBuf buffer)
{
ByteBuf content;
@@ -141,7 +210,7 @@ class PendingReadOp implements ReadEntryCallback, Runnable {
}
try {
content = lh.macManager.verifyDigestAndReturnData(eId, buffer);
- } catch (BKDigestMatchException e) {
+ } catch (BKException.BKDigestMatchException e) {
clientCtx.getClientStats().getReadOpDmCounter().inc();
logErrorAndReattemptRead(bookieIndex, host, "Mac mismatch",
BKException.Code.DigestMatchException);
return false;
@@ -161,125 +230,9 @@ class PendingReadOp implements ReadEntryCallback,
Runnable {
return false;
}
}
-
- /**
- * Fail the request with given result code <i>rc</i>.
- *
- * @param rc
- * result code to fail the request.
- * @return true if we managed to fail the entry; otherwise return
false if it already failed or completed.
- */
- boolean fail(int rc) {
- if (complete.compareAndSet(false, true)) {
- this.rc = rc;
- submitCallback(rc);
- return true;
- } else {
- return false;
- }
- }
-
- /**
- * Log error <i>errMsg</i> and reattempt read from <i>host</i>.
- *
- * @param bookieIndex
- * bookie index
- * @param host
- * host that just respond
- * @param errMsg
- * error msg to log
- * @param rc
- * read result code
- */
- synchronized void logErrorAndReattemptRead(int bookieIndex, BookieId
host, String errMsg, int rc) {
- if (BKException.Code.OK == firstError
- || BKException.Code.NoSuchEntryException == firstError
- || BKException.Code.NoSuchLedgerExistsException == firstError)
{
- firstError = rc;
- } else if (BKException.Code.BookieHandleNotAvailableException ==
firstError
- && BKException.Code.NoSuchEntryException != rc
- && BKException.Code.NoSuchLedgerExistsException != rc) {
- // if other exception rather than NoSuchEntryException or
NoSuchLedgerExistsException is
- // returned we need to update firstError to indicate that it
might be a valid read but just
- // failed.
- firstError = rc;
- }
- if (BKException.Code.NoSuchEntryException == rc
- || BKException.Code.NoSuchLedgerExistsException == rc) {
- ++numBookiesMissingEntry;
- if (LOG.isDebugEnabled()) {
- LOG.debug("No such entry found on bookie. L{} E{} bookie:
{}",
- lh.ledgerId, eId, host);
- }
- } else {
- if (LOG.isInfoEnabled()) {
- LOG.info("{} while reading L{} E{} from bookie: {}",
- errMsg, lh.ledgerId, eId, host);
- }
- }
-
- lh.recordReadErrorOnBookie(bookieIndex);
- }
-
- /**
- * Send to next replica speculatively, if required and possible.
- * This returns the host we may have sent to for unit testing.
- *
- * @param heardFromHostsBitSet
- * the set of hosts that we already received responses.
- * @return host we sent to if we sent. null otherwise.
- */
- abstract BookieId maybeSendSpeculativeRead(BitSet
heardFromHostsBitSet);
-
- /**
- * Whether the read request completed.
- *
- * @return true if the read request is completed.
- */
- boolean isComplete() {
- return complete.get();
- }
-
- /**
- * Get result code of this entry.
- *
- * @return result code.
- */
- int getRc() {
- return rc;
- }
-
- @Override
- public String toString() {
- return String.format("L%d-E%d", lh.getId(), eId);
- }
-
- /**
- * Issues a speculative request and indicates if more speculative
- * requests should be issued.
- *
- * @return whether more speculative requests should be issued
- */
- @Override
- public ListenableFuture<Boolean> issueSpeculativeRequest() {
- return clientCtx.getMainWorkerPool().submitOrdered(lh.getId(), new
Callable<Boolean>() {
- @Override
- public Boolean call() throws Exception {
- if (!isComplete() && null !=
maybeSendSpeculativeRead(heardFromHostsBitSet)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Send speculative read for {}. Hosts
sent are {}, "
- + " Hosts heard are {}, ensemble
is {}.",
- this, sentToHosts, heardFromHostsBitSet,
ensemble);
- }
- return true;
- }
- return false;
- }
- });
- }
}
- class ParallelReadRequest extends LedgerEntryRequest {
+ class ParallelReadRequest extends SingleLedgerEntryRequest {
int numPendings;
@@ -326,7 +279,7 @@ class PendingReadOp implements ReadEntryCallback, Runnable {
}
}
- class SequenceReadRequest extends LedgerEntryRequest {
+ class SequenceReadRequest extends SingleLedgerEntryRequest {
static final int NOT_FOUND = -1;
int nextReplicaIndexToReadFrom = 0;
@@ -456,205 +409,4 @@ class PendingReadOp implements ReadEntryCallback,
Runnable {
return completed;
}
}
-
- PendingReadOp(LedgerHandle lh,
- ClientContext clientCtx,
- long startEntryId,
- long endEntryId,
- boolean isRecoveryRead) {
- this.seq = new LinkedList<>();
- this.future = new CompletableFuture<>();
- this.lh = lh;
- this.clientCtx = clientCtx;
- this.startEntryId = startEntryId;
- this.endEntryId = endEntryId;
- this.isRecoveryRead = isRecoveryRead;
-
- this.allowFailFast = false;
- numPendingEntries = endEntryId - startEntryId + 1;
- requiredBookiesMissingEntryForRecovery =
getLedgerMetadata().getWriteQuorumSize()
- - getLedgerMetadata().getAckQuorumSize() + 1;
- heardFromHosts = new HashSet<>();
- heardFromHostsBitSet = new
BitSet(getLedgerMetadata().getEnsembleSize());
- }
-
- CompletableFuture<LedgerEntries> future() {
- return future;
- }
-
- protected LedgerMetadata getLedgerMetadata() {
- return lh.getLedgerMetadata();
- }
-
- protected void cancelSpeculativeTask(boolean mayInterruptIfRunning) {
- if (speculativeTask != null) {
- speculativeTask.cancel(mayInterruptIfRunning);
- speculativeTask = null;
- }
- }
-
- public ScheduledFuture<?> getSpeculativeTask() {
- return speculativeTask;
- }
-
- PendingReadOp parallelRead(boolean enabled) {
- this.parallelRead = enabled;
- return this;
- }
-
- void allowFailFastOnUnwritableChannel() {
- allowFailFast = true;
- }
-
- public void submit() {
- clientCtx.getMainWorkerPool().executeOrdered(lh.ledgerId, this);
- }
-
- void initiate() {
- long nextEnsembleChange = startEntryId, i = startEntryId;
- this.requestTimeNanos = MathUtils.nowInNano();
- List<BookieId> ensemble = null;
- do {
- if (i == nextEnsembleChange) {
- ensemble = getLedgerMetadata().getEnsembleAt(i);
- nextEnsembleChange =
LedgerMetadataUtils.getNextEnsembleChange(getLedgerMetadata(), i);
- }
- LedgerEntryRequest entry;
- if (parallelRead) {
- entry = new ParallelReadRequest(ensemble, lh.ledgerId, i);
- } else {
- entry = new SequenceReadRequest(ensemble, lh.ledgerId, i);
- }
- seq.add(entry);
- i++;
- } while (i <= endEntryId);
- // read the entries.
- for (LedgerEntryRequest entry : seq) {
- entry.read();
- if (!parallelRead &&
clientCtx.getConf().readSpeculativeRequestPolicy.isPresent()) {
- speculativeTask =
clientCtx.getConf().readSpeculativeRequestPolicy.get()
- .initiateSpeculativeRequest(clientCtx.getScheduler(),
entry);
- }
- }
- }
-
- @Override
- public void run() {
- initiate();
- }
-
- private static class ReadContext implements ReadEntryCallbackCtx {
- final int bookieIndex;
- final BookieId to;
- final LedgerEntryRequest entry;
- long lac = LedgerHandle.INVALID_ENTRY_ID;
-
- ReadContext(int bookieIndex, BookieId to, LedgerEntryRequest entry) {
- this.bookieIndex = bookieIndex;
- this.to = to;
- this.entry = entry;
- }
-
- @Override
- public void setLastAddConfirmed(long lac) {
- this.lac = lac;
- }
-
- @Override
- public long getLastAddConfirmed() {
- return lac;
- }
- }
-
- private static ReadContext createReadContext(int bookieIndex, BookieId to,
LedgerEntryRequest entry) {
- return new ReadContext(bookieIndex, to, entry);
- }
-
- void sendReadTo(int bookieIndex, BookieId to, LedgerEntryRequest entry)
throws InterruptedException {
- if (lh.throttler != null) {
- lh.throttler.acquire();
- }
-
- if (isRecoveryRead) {
- int flags = BookieProtocol.FLAG_HIGH_PRIORITY |
BookieProtocol.FLAG_DO_FENCING;
- clientCtx.getBookieClient().readEntry(to, lh.ledgerId, entry.eId,
- this, new ReadContext(bookieIndex, to, entry), flags,
lh.ledgerKey);
- } else {
- clientCtx.getBookieClient().readEntry(to, lh.ledgerId, entry.eId,
- this, new ReadContext(bookieIndex, to, entry),
BookieProtocol.FLAG_NONE);
- }
- }
-
- @Override
- public void readEntryComplete(int rc, long ledgerId, final long entryId,
final ByteBuf buffer, Object ctx) {
- final ReadContext rctx = (ReadContext) ctx;
- final LedgerEntryRequest entry = rctx.entry;
-
- if (rc != BKException.Code.OK) {
- entry.logErrorAndReattemptRead(rctx.bookieIndex, rctx.to, "Error:
" + BKException.getMessage(rc), rc);
- return;
- }
-
- heardFromHosts.add(rctx.to);
- heardFromHostsBitSet.set(rctx.bookieIndex, true);
-
- buffer.retain();
- // if entry has completed don't handle twice
- if (entry.complete(rctx.bookieIndex, rctx.to, buffer)) {
- if (!isRecoveryRead) {
- // do not advance LastAddConfirmed for recovery reads
- lh.updateLastConfirmed(rctx.getLastAddConfirmed(), 0L);
- }
- submitCallback(BKException.Code.OK);
- } else {
- buffer.release();
- }
-
- if (numPendingEntries < 0) {
- LOG.error("Read too many values for ledger {} : [{}, {}].",
- ledgerId, startEntryId, endEntryId);
- }
- }
-
- protected void submitCallback(int code) {
- if (BKException.Code.OK == code) {
- numPendingEntries--;
- if (numPendingEntries != 0) {
- return;
- }
- }
-
- // ensure callback once
- if (!complete.compareAndSet(false, true)) {
- return;
- }
-
- cancelSpeculativeTask(true);
-
- long latencyNanos = MathUtils.elapsedNanos(requestTimeNanos);
- if (code != BKException.Code.OK) {
- long firstUnread = LedgerHandle.INVALID_ENTRY_ID;
- Integer firstRc = null;
- for (LedgerEntryRequest req : seq) {
- if (!req.isComplete()) {
- firstUnread = req.eId;
- firstRc = req.rc;
- break;
- }
- }
- LOG.error(
- "Read of ledger entry failed: L{} E{}-E{}, Sent to {}, "
- + "Heard from {} : bitset = {}, Error = '{}'.
First unread entry is ({}, rc = {})",
- lh.getId(), startEntryId, endEntryId, sentToHosts,
heardFromHosts, heardFromHostsBitSet,
- BKException.getMessage(code), firstUnread, firstRc);
-
clientCtx.getClientStats().getReadOpLogger().registerFailedEvent(latencyNanos,
TimeUnit.NANOSECONDS);
- // release the entries
- seq.forEach(LedgerEntryRequest::close);
- future.completeExceptionally(BKException.create(code));
- } else {
-
clientCtx.getClientStats().getReadOpLogger().registerSuccessfulEvent(latencyNanos,
TimeUnit.NANOSECONDS);
- future.complete(LedgerEntriesImpl.create(Lists.transform(seq,
input -> input.entryImpl)));
- }
- }
-
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOpBase.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOpBase.java
new file mode 100644
index 0000000000..cbd68ec657
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOpBase.java
@@ -0,0 +1,293 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.BitSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class ReadOpBase implements Runnable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ReadOpBase.class);
+
+ protected ScheduledFuture<?> speculativeTask = null;
+ protected final CompletableFuture<LedgerEntries> future;
+ protected final Set<BookieId> heardFromHosts;
+ protected final BitSet heardFromHostsBitSet;
+ protected final Set<BookieId> sentToHosts = new HashSet<BookieId>();
+ LedgerHandle lh;
+ protected ClientContext clientCtx;
+
+ protected final long startEntryId;
+ protected long requestTimeNanos;
+
+ protected final int requiredBookiesMissingEntryForRecovery;
+ protected final boolean isRecoveryRead;
+
+ protected final AtomicBoolean complete = new AtomicBoolean(false);
+ protected boolean allowFailFast = false;
+ long numPendingEntries;
+ final long endEntryId;
+ protected ReadOpBase(LedgerHandle lh, ClientContext clientCtx, long
startEntryId, long endEntryId,
+ boolean isRecoveryRead) {
+ this.lh = lh;
+ this.future = new CompletableFuture<>();
+ this.startEntryId = startEntryId;
+ this.endEntryId = endEntryId;
+ this.isRecoveryRead = isRecoveryRead;
+ this.requiredBookiesMissingEntryForRecovery =
getLedgerMetadata().getWriteQuorumSize()
+ - getLedgerMetadata().getAckQuorumSize() + 1;
+ this.heardFromHosts = new HashSet<>();
+ this.heardFromHostsBitSet = new
BitSet(getLedgerMetadata().getEnsembleSize());
+ this.allowFailFast = false;
+ this.clientCtx = clientCtx;
+ }
+
+ protected LedgerMetadata getLedgerMetadata() {
+ return lh.getLedgerMetadata();
+ }
+
+ protected void cancelSpeculativeTask(boolean mayInterruptIfRunning) {
+ if (speculativeTask != null) {
+ speculativeTask.cancel(mayInterruptIfRunning);
+ speculativeTask = null;
+ }
+ }
+
+ public ScheduledFuture<?> getSpeculativeTask() {
+ return speculativeTask;
+ }
+
+ CompletableFuture<LedgerEntries> future() {
+ return future;
+ }
+
+ void allowFailFastOnUnwritableChannel() {
+ allowFailFast = true;
+ }
+
+ public void submit() {
+ clientCtx.getMainWorkerPool().executeOrdered(lh.ledgerId, this);
+ }
+
+ @Override
+ public void run() {
+ initiate();
+ }
+
+ abstract void initiate();
+
+ protected abstract void submitCallback(int code);
+
+ abstract class LedgerEntryRequest implements SpeculativeRequestExecutor {
+
+ final AtomicBoolean complete = new AtomicBoolean(false);
+
+ int rc = BKException.Code.OK;
+ int firstError = BKException.Code.OK;
+ int numBookiesMissingEntry = 0;
+
+ final long eId;
+
+ final List<BookieId> ensemble;
+ final DistributionSchedule.WriteSet writeSet;
+
+
+ LedgerEntryRequest(List<BookieId> ensemble, final long eId) {
+ this.ensemble = ensemble;
+ this.eId = eId;
+ if (clientCtx.getConf().enableReorderReadSequence) {
+ writeSet = clientCtx.getPlacementPolicy()
+ .reorderReadSequence(
+ ensemble,
+ lh.getBookiesHealthInfo(),
+ lh.getWriteSetForReadOperation(eId));
+ } else {
+ writeSet = lh.getWriteSetForReadOperation(eId);
+ }
+ }
+
+ public void close() {
+ // this request has succeeded before, can't recycle writeSet again
+ if (complete.compareAndSet(false, true)) {
+ rc = BKException.Code.UnexpectedConditionException;
+ writeSet.recycle();
+ }
+ }
+
+ /**
+ * Execute the read request.
+ */
+ abstract void read();
+
+ /**
+ * Fail the request with given result code <i>rc</i>.
+ *
+ * @param rc
+ * result code to fail the request.
+ * @return true if we managed to fail the entry; otherwise return
false if it already failed or completed.
+ */
+ boolean fail(int rc) {
+ if (complete.compareAndSet(false, true)) {
+ this.rc = rc;
+ writeSet.recycle();
+ submitCallback(rc);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Log error <i>errMsg</i> and reattempt read from <i>host</i>.
+ *
+ * @param bookieIndex
+ * bookie index
+ * @param host
+ * host that just respond
+ * @param errMsg
+ * error msg to log
+ * @param rc
+ * read result code
+ */
+ synchronized void logErrorAndReattemptRead(int bookieIndex, BookieId
host, String errMsg, int rc) {
+ if (BKException.Code.OK == firstError
+ || BKException.Code.NoSuchEntryException == firstError
+ || BKException.Code.NoSuchLedgerExistsException ==
firstError) {
+ firstError = rc;
+ } else if (BKException.Code.BookieHandleNotAvailableException ==
firstError
+ && BKException.Code.NoSuchEntryException != rc
+ && BKException.Code.NoSuchLedgerExistsException != rc) {
+ // if other exception rather than NoSuchEntryException or
NoSuchLedgerExistsException is
+ // returned we need to update firstError to indicate that it
might be a valid read but just
+ // failed.
+ firstError = rc;
+ }
+ if (BKException.Code.NoSuchEntryException == rc
+ || BKException.Code.NoSuchLedgerExistsException == rc) {
+ ++numBookiesMissingEntry;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No such entry found on bookie. L{} E{} bookie:
{}",
+ lh.ledgerId, eId, host);
+ }
+ } else {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("{} while reading L{} E{} from bookie: {}",
+ errMsg, lh.ledgerId, eId, host);
+ }
+ }
+
+ lh.recordReadErrorOnBookie(bookieIndex);
+ }
+
+ /**
+ * Send to next replica speculatively, if required and possible.
+ * This returns the host we may have sent to for unit testing.
+ *
+ * @param heardFromHostsBitSet
+ * the set of hosts that we already received responses.
+ * @return host we sent to if we sent. null otherwise.
+ */
+ abstract BookieId maybeSendSpeculativeRead(BitSet
heardFromHostsBitSet);
+
+ /**
+ * Whether the read request completed.
+ *
+ * @return true if the read request is completed.
+ */
+ boolean isComplete() {
+ return complete.get();
+ }
+
+ /**
+ * Get result code of this entry.
+ *
+ * @return result code.
+ */
+ int getRc() {
+ return rc;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("L%d-E%d", lh.getId(), eId);
+ }
+
+ /**
+ * Issues a speculative request and indicates if more speculative
+ * requests should be issued.
+ *
+ * @return whether more speculative requests should be issued
+ */
+ @Override
+ public ListenableFuture<Boolean> issueSpeculativeRequest() {
+ return clientCtx.getMainWorkerPool().submitOrdered(lh.getId(), new
Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ if (!isComplete() && null !=
maybeSendSpeculativeRead(heardFromHostsBitSet)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Send speculative read for {}. Hosts
sent are {}, "
+ + " Hosts heard are {}, ensemble
is {}.",
+ this, sentToHosts, heardFromHostsBitSet,
ensemble);
+ }
+ return true;
+ }
+ return false;
+ }
+ });
+ }
+ }
+
+ protected static class ReadContext implements
BookkeeperInternalCallbacks.ReadEntryCallbackCtx {
+ final int bookieIndex;
+ final BookieId to;
+ final PendingReadOp.LedgerEntryRequest entry;
+ long lac = LedgerHandle.INVALID_ENTRY_ID;
+
+ ReadContext(int bookieIndex, BookieId to,
PendingReadOp.LedgerEntryRequest entry) {
+ this.bookieIndex = bookieIndex;
+ this.to = to;
+ this.entry = entry;
+ }
+
+ @Override
+ public void setLastAddConfirmed(long lac) {
+ this.lac = lac;
+ }
+
+ @Override
+ public long getLastAddConfirmed() {
+ return lac;
+ }
+ }
+}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
index f2ada1e5dc..423e02b4aa 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
@@ -33,7 +33,6 @@ import static org.mockito.Mockito.mock;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
-import java.lang.reflect.Method;
import java.util.Iterator;
import java.util.List;
import java.util.TreeMap;
@@ -268,8 +267,8 @@ public class TestParallelRead extends
BookKeeperClusterTestCase {
PendingReadOp pendingReadOp = new PendingReadOp(lh, clientContext, 1,
2, false);
pendingReadOp.parallelRead(true);
pendingReadOp.initiate();
- PendingReadOp.LedgerEntryRequest first = pendingReadOp.seq.get(0);
- PendingReadOp.LedgerEntryRequest second = pendingReadOp.seq.get(1);
+ PendingReadOp.SingleLedgerEntryRequest first =
pendingReadOp.seq.get(0);
+ PendingReadOp.SingleLedgerEntryRequest second =
pendingReadOp.seq.get(1);
pendingReadOp.submitCallback(-105);
@@ -287,13 +286,9 @@ public class TestParallelRead extends
BookKeeperClusterTestCase {
assertTrue(second.complete.get());
// Mock ledgerEntryImpl reuse
- Method method =
PendingReadOp.class.getDeclaredMethod("createReadContext",
- int.class, BookieId.class,
PendingReadOp.LedgerEntryRequest.class);
- method.setAccessible(true);
-
ByteBuf byteBuf = Unpooled.buffer(10);
pendingReadOp.readEntryComplete(BKException.Code.OK, 1, 1,
Unpooled.buffer(10),
- method.invoke(pendingReadOp, 1, BookieId.parse("test"),
first));
+ new ReadOpBase.ReadContext(1, BookieId.parse("test"), first));
// byteBuf has been release
assertEquals(byteBuf.refCnt(), 1);
@@ -308,15 +303,15 @@ public class TestParallelRead extends
BookKeeperClusterTestCase {
// read entry failed twice, will not close twice
pendingReadOp.readEntryComplete(BKException.Code.TooManyRequestsException, 1,
1, Unpooled.buffer(10),
- method.invoke(pendingReadOp, 1, BookieId.parse("test"),
first));
+ new ReadOpBase.ReadContext(1, BookieId.parse("test"), first));
pendingReadOp.readEntryComplete(BKException.Code.TooManyRequestsException, 1,
1, Unpooled.buffer(10),
- method.invoke(pendingReadOp, 1, BookieId.parse("test"),
first));
+ new ReadOpBase.ReadContext(1, BookieId.parse("test"), first));
// will not complete twice when completed
byteBuf = Unpooled.buffer(10);
pendingReadOp.readEntryComplete(Code.OK, 1, 1, Unpooled.buffer(10),
- method.invoke(pendingReadOp, 1, BookieId.parse("test"),
first));
+ new ReadOpBase.ReadContext(1, BookieId.parse("test"), first));
assertEquals(1, byteBuf.refCnt());
}