This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 29257d0 If cursor update to BK fails, fallback to meta store (#1461) 29257d0 is described below commit 29257d0747a7f044f5e46b55db973eed0e15f1f9 Author: Matteo Merli <mme...@apache.org> AuthorDate: Wed Mar 28 00:23:28 2018 -0700 If cursor update to BK fails, fallback to meta store (#1461) * If cursor update to BK fails, fallback to meta store * Fixed missing parameters in logs --- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 22 +++++++++++++++++- .../bookkeeper/mledger/impl/ManagedCursorTest.java | 26 +++++++++++++++++++--- .../mledger/impl/ManagedLedgerErrorsTest.java | 1 + .../mledger/impl/NonDurableCursorTest.java | 8 ++----- 4 files changed, 47 insertions(+), 10 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 45fcf90..c2fbbab 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1820,6 +1820,7 @@ public class ManagedCursorImpl implements ManagedCursor { new MetaStoreCallback<Void>() { @Override public void operationComplete(Void result, Stat stat) { + cursorLedgerStat = stat; callback.operationComplete(result, stat); } @@ -2055,7 +2056,26 @@ public class ManagedCursorImpl implements ManagedCursor { // If we've had a write error, the ledger will be automatically closed, we need to create a new one, // in the meantime the mark-delete will be queued. STATE_UPDATER.compareAndSet(ManagedCursorImpl.this, State.Open, State.NoLedger); - callback.operationFailed(createManagedLedgerException(rc)); + + // Before giving up, try to persist the position in the metadata store + persistPositionMetaStore(-1, position, mdEntry.properties, new MetaStoreCallback<Void>() { + @Override + public void operationComplete(Void result, Stat stat) { + if (log.isDebugEnabled()) { + log.debug( + "[{}][{}] Updated cursor in meta store after previous failure in ledger at position {}", + ledger.getName(), name, position); + } + callback.operationComplete(); + } + + @Override + public void operationFailed(MetaStoreException e) { + log.warn("[{}][{}] Failed to update cursor in meta store after previous failure in ledger: {}", + ledger.getName(), name, e.getMessage()); + callback.operationFailed(createManagedLedgerException(rc)); + } + }, true); } }, null); } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 312b894..ad85a35 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -64,6 +64,7 @@ import org.apache.bookkeeper.mledger.impl.MetaStore.Stat; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; +import org.apache.zookeeper.KeeperException.Code; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.Test; @@ -409,11 +410,29 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { stopBookKeeper(); assertEquals(entries.size(), 1); + // Mark-delete should succeed if BK is down + cursor.markDelete(entries.get(0).getPosition()); + + entries.forEach(e -> e.release()); + } + + @Test(timeOut = 20000) + void markDeleteWithZKErrors() throws Exception { + ManagedLedger ledger = factory.open("my_test_ledger"); + ManagedCursor cursor = ledger.openCursor("c1"); + ledger.addEntry("dummy-entry-1".getBytes(Encoding)); + List<Entry> entries = cursor.readEntries(100); + + assertEquals(entries.size(), 1); + + stopBookKeeper(); + stopZooKeeper(); + try { cursor.markDelete(entries.get(0).getPosition()); - fail("call should have failed"); - } catch (ManagedLedgerException e) { - // ok + fail("Should have failed"); + } catch (Exception e) { + // Expected } entries.forEach(e -> e.release()); @@ -1022,6 +1041,7 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { ManagedLedger ledger = factory.open("my_test_ledger"); bkc.failAfter(1, BKException.Code.NotEnoughBookiesException); + zkc.failNow(Code.SESSIONEXPIRED); try { ledger.openCursor("c1"); fail("should have failed"); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java index 5004d28..4efeefb 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java @@ -486,6 +486,7 @@ public class ManagedLedgerErrorsTest extends MockedBookKeeperTestCase { Position position = ledger.addEntry("entry".getBytes()); bkc.failNow(BKException.Code.BookieHandleNotAvailableException); + zkc.failNow(Code.CONNECTIONLOSS); try { cursor.markDelete(position); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java index afc751c..ff99da3 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java @@ -252,12 +252,8 @@ public class NonDurableCursorTest extends MockedBookKeeperTestCase { stopBookKeeper(); assertEquals(entries.size(), 1); - try { - cursor.markDelete(entries.get(0).getPosition()); - fail("call should have failed"); - } catch (ManagedLedgerException e) { - // ok - } + // Mark-delete should succeed if BK is down + cursor.markDelete(entries.get(0).getPosition()); entries.forEach(e -> e.release()); } -- To stop receiving notification emails like this one, please contact si...@apache.org.