This is an automated email from the ASF dual-hosted git repository.
ayegorov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 0f22d23 fix: Bookkeeper client throttling logic is based upon entryId
instead of ledgerId
0f22d23 is described below
commit 0f22d238c225ec667c58e1c5029644478d636128
Author: Andrey Yegorov <[email protected]>
AuthorDate: Mon May 3 11:44:55 2021 -0700
fix: Bookkeeper client throttling logic is based upon entryId instead of
ledgerId
Descriptions of the changes in this PR:
Fixes: #2660
### Changes
isWriteSetWritable() to use ledgerId for the client selection
Master Issue: #2660
Reviewers: Enrico Olivelli <[email protected]>, Ivan Kelly
<[email protected]>
This closes #2664 from dlg99/fix/2660
---
.../java/org/apache/bookkeeper/client/LedgerHandle.java | 16 ++++++++--------
.../org/apache/bookkeeper/client/LedgerHandleAdv.java | 2 +-
2 files changed, 9 insertions(+), 9 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 806dd8b..48818bd 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -880,7 +880,7 @@ public class LedgerHandle implements WriteHandle {
// unresponsive thus helpful enough.
DistributionSchedule.WriteSet ws =
distributionSchedule.getWriteSet(firstEntry);
try {
- if (!waitForWritable(ws, firstEntry, ws.size() - 1,
clientCtx.getConf().waitForWriteSetMs)) {
+ if (!waitForWritable(ws, ws.size() - 1,
clientCtx.getConf().waitForWriteSetMs)) {
op.allowFailFastOnUnwritableChannel();
}
} finally {
@@ -1213,7 +1213,7 @@ public class LedgerHandle implements WriteHandle {
}
private boolean isWriteSetWritable(DistributionSchedule.WriteSet writeSet,
- long key, int allowedNonWritableCount) {
+ int allowedNonWritableCount) {
if (allowedNonWritableCount < 0) {
allowedNonWritableCount = 0;
}
@@ -1224,7 +1224,7 @@ public class LedgerHandle implements WriteHandle {
int nonWritableCount = 0;
List<BookieId> currentEnsemble = getCurrentEnsemble();
for (int i = 0; i < sz; i++) {
- if
(!clientCtx.getBookieClient().isWritable(currentEnsemble.get(i), key)) {
+ if
(!clientCtx.getBookieClient().isWritable(currentEnsemble.get(i), ledgerId)) {
nonWritableCount++;
if (nonWritableCount >= allowedNonWritableCount) {
return false;
@@ -1239,21 +1239,21 @@ public class LedgerHandle implements WriteHandle {
return true;
}
- protected boolean waitForWritable(DistributionSchedule.WriteSet writeSet,
long key,
+ protected boolean waitForWritable(DistributionSchedule.WriteSet writeSet,
int allowedNonWritableCount, long
durationMs) {
if (durationMs < 0) {
return true;
}
final long startTime = MathUtils.nowInNano();
- boolean success = isWriteSetWritable(writeSet, key,
allowedNonWritableCount);
+ boolean success = isWriteSetWritable(writeSet,
allowedNonWritableCount);
if (!success && durationMs > 0) {
int backoff = 1;
final int maxBackoff = 4;
final long deadline = startTime +
TimeUnit.MILLISECONDS.toNanos(durationMs);
- while (!isWriteSetWritable(writeSet, key,
allowedNonWritableCount)) {
+ while (!isWriteSetWritable(writeSet, allowedNonWritableCount)) {
if (MathUtils.nowInNano() < deadline) {
long maxSleep = MathUtils.elapsedMSec(startTime);
if (maxSleep < 0) {
@@ -1265,7 +1265,7 @@ public class LedgerHandle implements WriteHandle {
TimeUnit.MILLISECONDS.sleep(sleepMs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- success = isWriteSetWritable(writeSet, key,
allowedNonWritableCount);
+ success = isWriteSetWritable(writeSet,
allowedNonWritableCount);
break;
}
if (backoff <= maxBackoff) {
@@ -1340,7 +1340,7 @@ public class LedgerHandle implements WriteHandle {
DistributionSchedule.WriteSet ws =
distributionSchedule.getWriteSet(op.getEntryId());
try {
- if (!waitForWritable(ws, op.getEntryId(), 0,
clientCtx.getConf().waitForWriteSetMs)) {
+ if (!waitForWritable(ws, 0,
clientCtx.getConf().waitForWriteSetMs)) {
op.allowFailFastOnUnwritableChannel();
}
} finally {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
index 2ea0e0a..1bdc653 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
@@ -264,7 +264,7 @@ public class LedgerHandleAdv extends LedgerHandle
implements WriteAdvHandle {
}
if (!waitForWritable(distributionSchedule.getWriteSet(op.getEntryId()),
- op.getEntryId(), 0,
clientCtx.getConf().waitForWriteSetMs)) {
+ 0, clientCtx.getConf().waitForWriteSetMs)) {
op.allowFailFastOnUnwritableChannel();
}