This is an automated email from the ASF dual-hosted git repository.
yong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 5870922754 [fix]Wrong error code(-107) of opening a deleted ledger
(#4657)
5870922754 is described below
commit 5870922754b9585f7d3592dcd0714a4da65d9d80
Author: fengyubiao <[email protected]>
AuthorDate: Tue Sep 2 12:26:50 2025 +0800
[fix]Wrong error code(-107) of opening a deleted ledger (#4657)
---
.../apache/bookkeeper/bookie/BookieException.java | 12 +++
.../bookkeeper/bookie/HandleFactoryImpl.java | 2 +-
.../bookkeeper/proto/PerChannelBookieClient.java | 5 +
.../bookkeeper/proto/ReadEntryProcessor.java | 2 +-
.../bookkeeper/proto/ReadEntryProcessorV3.java | 5 +-
.../bookkeeper/proto/WriteEntryProcessor.java | 4 +-
.../bookkeeper/proto/WriteEntryProcessorV3.java | 4 +-
.../bookkeeper/proto/WriteLacProcessorV3.java | 4 +
.../org/apache/bookkeeper/client/TestFencing.java | 113 +++++++++++++++++++++
9 files changed, 143 insertions(+), 8 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java
index 2b85961cf4..d88673d007 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java
@@ -69,6 +69,8 @@ public abstract class BookieException extends Exception {
return new MetadataStoreException();
case Code.UnknownBookieIdException:
return new UnknownBookieIdException();
+ case Code.LedgerFencedAndDeletedException:
+ return new LedgerFencedAndDeletedException();
case Code.DataUnknownException:
return new DataUnknownException();
default:
@@ -95,6 +97,7 @@ public abstract class BookieException extends Exception {
int CookieExistsException = -109;
int EntryLogMetadataMapException = -110;
int DataUnknownException = -111;
+ int LedgerFencedAndDeletedException = -112;
}
public int getCode() {
@@ -199,6 +202,15 @@ public abstract class BookieException extends Exception {
}
}
+ /**
+ * Signals that a ledger has been fenced in a bookie. No more entries can
be appended to that ledger.
+ */
+ public static class LedgerFencedAndDeletedException extends
BookieException {
+ public LedgerFencedAndDeletedException() {
+ super(Code.LedgerFencedException);
+ }
+ }
+
/**
* Signals that a ledger's operation has been rejected by an internal
component because of the resource saturation.
*/
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java
index b331f12506..ac87c3aed4 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java
@@ -57,7 +57,7 @@ class HandleFactoryImpl implements HandleFactory,
LedgerDeletionListener {
if (handle == null) {
if (!journalReplay &&
recentlyFencedAndDeletedLedgers.getIfPresent(ledgerId) != null) {
- throw
BookieException.create(BookieException.Code.LedgerFencedException);
+ throw
BookieException.create(BookieException.Code.LedgerFencedAndDeletedException);
}
handle = LedgerDescriptor.create(masterKey, ledgerId,
ledgerStorage);
ledgers.putIfAbsent(ledgerId, handle);
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 40712bf854..d2df91aa90 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -1194,6 +1194,11 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
completion.setOutstanding();
}
} else {
+ try {
+ future.get();
+ } catch (Exception ex) {
+ LOG.warn("Failed to request to the bookie: {}",
bookieId, ex);
+ }
nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime),
TimeUnit.NANOSECONDS);
errorOut(key);
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
index 3930b8ea75..38088a5c44 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
@@ -94,7 +94,7 @@ class ReadEntryProcessor extends
PacketProcessorBase<ReadRequest> {
handleReadResultForFenceRead(fenceResult, data,
startTimeNanos);
return;
}
- } catch (Bookie.NoLedgerException e) {
+ } catch (Bookie.NoLedgerException |
BookieException.LedgerFencedAndDeletedException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Error reading {}", request, e);
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
index 3a85ca0949..e4519ff20c 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
@@ -215,9 +215,10 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 {
}
}
return readEntry(readResponse, entryId, startTimeSw);
- } catch (Bookie.NoLedgerException e) {
+ } catch (Bookie.NoLedgerException |
BookieException.LedgerFencedAndDeletedException e) {
if (RequestUtils.isFenceRequest(readRequest)) {
- LOG.info("No ledger found reading entry {} when fencing ledger
{}", entryId, ledgerId);
+ LOG.info("No ledger found(or it has been deleted) reading
entry {} when fencing ledger {}",
+ entryId, ledgerId);
} else if (entryId != BookieProtocol.LAST_ADD_CONFIRMED) {
LOG.info("No ledger found while reading entry: {} from ledger:
{}", entryId, ledgerId);
} else if (LOG.isDebugEnabled()) {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
index 2fbc8ffb0f..7e8d0ff45e 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
@@ -90,8 +90,8 @@ class WriteEntryProcessor extends
PacketProcessorBase<ParsedAddRequest> implemen
} catch (IOException e) {
LOG.error("Error writing {}", request, e);
rc = BookieProtocol.EIO;
- } catch (BookieException.LedgerFencedException lfe) {
- LOG.warn("Write attempt on fenced ledger {} by client {}",
request.getLedgerId(),
+ } catch (BookieException.LedgerFencedException |
BookieException.LedgerFencedAndDeletedException lfe) {
+ LOG.warn("Write attempt on fenced/deleted ledger {} by client {}",
request.getLedgerId(),
requestHandler.ctx().channel().remoteAddress());
rc = BookieProtocol.EFENCED;
} catch (BookieException e) {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
index 3aaa9219f9..9c663269f8 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
@@ -136,8 +136,8 @@ class WriteEntryProcessorV3 extends PacketProcessorBaseV3 {
logger.error("Error writing entry:{} to ledger:{}",
entryId, ledgerId, e);
status = StatusCode.EIO;
- } catch (BookieException.LedgerFencedException e) {
- logger.error("Ledger fenced while writing entry:{} to ledger:{}",
+ } catch (BookieException.LedgerFencedException |
BookieException.LedgerFencedAndDeletedException e) {
+ logger.error("Ledger fenced/deleted while writing entry:{} to
ledger:{}",
entryId, ledgerId, e);
status = StatusCode.EFENCED;
} catch (BookieException e) {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
index 5590f5175c..5783da2e88 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
@@ -105,6 +105,10 @@ class WriteLacProcessorV3 extends PacketProcessorBaseV3
implements Runnable {
requestProcessor.bookie.setExplicitLac(Unpooled.wrappedBuffer(lacToAdd),
writeCallback, requestHandler, masterKey);
status = StatusCode.EOK;
+ } catch (BookieException.LedgerFencedAndDeletedException e) {
+ logger.error("Error saving lac {} for ledger:{}, which has been
deleted",
+ lac, ledgerId, e);
+ status = StatusCode.ENOLEDGER;
} catch (IOException e) {
logger.error("Error saving lac {} for ledger:{}",
lac, ledgerId, e);
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java
index 77382b4ebd..9f7a81450b 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java
@@ -24,6 +24,9 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import lombok.extern.slf4j.Slf4j;
@@ -33,10 +36,16 @@ import org.apache.bookkeeper.bookie.LedgerStorage;
import org.apache.bookkeeper.bookie.SortedLedgerStorage;
import org.apache.bookkeeper.bookie.storage.ldb.SingleDirectoryDbLedgerStorage;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.proto.BookieClientImpl;
+import org.apache.bookkeeper.proto.BookieProtocol;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
+import org.apache.bookkeeper.proto.PerChannelBookieClient;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.test.TestStatsProvider;
+import org.apache.bookkeeper.util.ByteBufList;
import org.awaitility.reflect.WhiteboxImpl;
import org.junit.Test;
import org.slf4j.Logger;
@@ -152,6 +161,110 @@ public class TestFencing extends
BookKeeperClusterTestCase {
}
}
+ @Test(timeout = 3000 * 1000)
+ public void testConcurrentFenceAndDeleteLedger() throws Exception {
+ LedgerHandle writeLedger;
+ writeLedger = bkc.createLedger(digestType, "password".getBytes());
+
+ String tmp = "BookKeeper is cool!";
+ long lac = 0;
+ for (int i = 0; i < 10; i++) {
+ long entryId = writeLedger.addEntry(tmp.getBytes());
+ LOG.info("entryId: {}", entryId);
+ lac = entryId;
+ }
+
+ // Fence and delete.
+ final BookieId bookieId =
writeLedger.getLedgerMetadata().getEnsembleAt(0).get(0);
+ ClientConfiguration clientConfiguration2 = newClientConfiguration();
+ clientConfiguration2.setUseV2WireProtocol(true);
+ ClientConfiguration clientConfiguration3 = newClientConfiguration();
+ BookKeeperTestClient bkcV2 = new
BookKeeperTestClient(clientConfiguration2, new TestStatsProvider());
+ LedgerHandle writeLedgerV2 = bkcV2.createLedger(digestType,
"password".getBytes());
+ BookKeeperTestClient bkcV3 = new
BookKeeperTestClient(clientConfiguration3, new TestStatsProvider());
+ LedgerHandle writeLedgerV3 = bkcV3.createLedger(digestType,
"password".getBytes());
+ ReadOnlyLedgerHandle readLedgerV2 =
+ (ReadOnlyLedgerHandle) bkcV2.openLedger(writeLedger.getId(),
digestType, "password".getBytes());
+ ReadOnlyLedgerHandle readLedgerV3 =
+ (ReadOnlyLedgerHandle) bkcV3.openLedger(writeLedger.getId(),
digestType, "password".getBytes());
+ BookieClientImpl bookieClientV2 = (BookieClientImpl)
readLedgerV2.clientCtx.getBookieClient();
+ BookieClientImpl bookieClientV3 = (BookieClientImpl)
readLedgerV3.clientCtx.getBookieClient();
+ // Trigger opening connection.
+ CompletableFuture<Integer> obtainV2 = new CompletableFuture<>();
+ bookieClientV2.lookupClient(bookieId).obtain(
+ new
BookkeeperInternalCallbacks.GenericCallback<PerChannelBookieClient>() {
+ @Override
+ public void operationComplete(int rc, PerChannelBookieClient
result) {
+ obtainV2.complete(rc);
+ }
+ }, writeLedger.getId());
+ assertEquals(obtainV2.get().intValue(), BKException.Code.OK);
+ CompletableFuture<Integer> obtainV3 = new CompletableFuture<>();
+ bookieClientV3.lookupClient(bookieId).obtain(
+ new
BookkeeperInternalCallbacks.GenericCallback<PerChannelBookieClient>() {
+ @Override
+ public void operationComplete(int rc, PerChannelBookieClient
result) {
+ obtainV3.complete(rc);
+ }
+ }, writeLedger.getId());
+ assertEquals(obtainV3.get().intValue(), BKException.Code.OK);
+ bkcV3.deleteLedger(readLedgerV3.ledgerId);
+
+ // Waiting for GC.
+ for (ServerTester server : servers) {
+ triggerGC(server.getServer().getBookie());
+ }
+
+ // Verify: read requests with V2 protocol will receive a
NoSuchLedgerException.
+ final byte readEntryFlagFencing = 1;
+ CompletableFuture<Integer> readResV2 = new CompletableFuture<>();
+ bookieClientV2.readEntry(bookieId,
+ writeLedger.getId(), 0, (rc, ledgerId, entryId1, buffer, ctx)
-> {
+ readResV2.complete(rc);
+ }, null, readEntryFlagFencing, readLedgerV2.ledgerKey);
+ assertEquals(BKException.Code.NoSuchLedgerExistsException,
readResV2.get().intValue());
+ // Verify: read requests with V3 protocol will receive a
NoSuchLedgerException.
+ CompletableFuture<Integer> readResV3 = new CompletableFuture<>();
+ bookieClientV3.readEntry(bookieId,
+ writeLedger.getId(), 0, (rc, ledgerId, entryId1, buffer, ctx)
-> {
+ readResV3.complete(rc);
+ }, null, readEntryFlagFencing, readLedgerV3.ledgerKey);
+ assertEquals(BKException.Code.NoSuchLedgerExistsException,
readResV3.get().intValue());
+ // Verify: add requests with V2 protocol will receive a
NoSuchLedgerException.
+ log.info("Try to add the next entry: {}:{}", writeLedger.getId(), lac
+ 1);
+ final ByteBuf dataV2 = UnpooledByteBufAllocator.DEFAULT.heapBuffer();
+ // Combine add request, and rewrite ledgerId of the request.
+ dataV2.writeByte(1);
+ final ByteBuf toSendV2 = (ByteBuf)
writeLedgerV2.macManager.computeDigestAndPackageForSending(
+ lac + 1, lac, 1, dataV2, writeLedger.ledgerKey,
BookieProtocol.FLAG_NONE);
+ toSendV2.setLong(28, writeLedger.getId());
+ CompletableFuture<Integer> addResV2 = new CompletableFuture<>();
+ bookieClientV2.addEntry(bookieId, writeLedger.getId(),
writeLedger.ledgerKey, lac + 1, toSendV2,
+ (rc, ledgerId, entryId1, addr, ctx) -> {
+ addResV2.complete(rc);
+ }, null, BookieProtocol.FLAG_NONE, false, WriteFlag.NONE);
+ assertEquals(BKException.Code.LedgerFencedException,
addResV2.get().intValue());
+ // Verify: read requests with V3 protocol will receive a
NoSuchLedgerException.
+ final ByteBuf dataV3 = UnpooledByteBufAllocator.DEFAULT.heapBuffer();
+ dataV3.writeByte(1);
+ // Combine add request, and rewrite ledgerId of the request.
+ final ByteBufList toSendV3 = (ByteBufList)
writeLedgerV3.macManager.computeDigestAndPackageForSending(
+ lac + 1, lac, 1, dataV3, writeLedger.ledgerKey,
BookieProtocol.FLAG_NONE);
+ toSendV3.getBuffer(0).setLong(0, writeLedger.getId());
+ CompletableFuture<Integer> addResV3 = new CompletableFuture<>();
+ bookieClientV3.addEntry(bookieId, writeLedger.getId(),
writeLedger.ledgerKey, lac + 1, toSendV3,
+ (rc, ledgerId, entryId1, addr, ctx) -> {
+ addResV3.complete(rc);
+ }, null, BookieProtocol.FLAG_NONE, false, WriteFlag.NONE);
+ assertEquals(BKException.Code.LedgerFencedException,
addResV3.get().intValue());
+
+ // cleanup.
+ writeLedgerV2.close();
+ writeLedgerV3.close();
+ bkcV2.close();
+ bkcV3.close();
+ }
+
private static int threadCount = 0;
class LedgerOpenThread extends Thread {