This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 29257d0  If cursor update to BK fails, fallback to meta store (#1461)
29257d0 is described below

commit 29257d0747a7f044f5e46b55db973eed0e15f1f9
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Wed Mar 28 00:23:28 2018 -0700

    If cursor update to BK fails, fallback to meta store (#1461)
    
    * If cursor update to BK fails, fallback to meta store
    
    * Fixed missing parameters in logs
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 22 +++++++++++++++++-
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 26 +++++++++++++++++++---
 .../mledger/impl/ManagedLedgerErrorsTest.java      |  1 +
 .../mledger/impl/NonDurableCursorTest.java         |  8 ++-----
 4 files changed, 47 insertions(+), 10 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 45fcf90..c2fbbab 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1820,6 +1820,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                 new MetaStoreCallback<Void>() {
                     @Override
                     public void operationComplete(Void result, Stat stat) {
+                        cursorLedgerStat = stat;
                         callback.operationComplete(result, stat);
                     }
 
@@ -2055,7 +2056,26 @@ public class ManagedCursorImpl implements ManagedCursor {
                 // If we've had a write error, the ledger will be 
automatically closed, we need to create a new one,
                 // in the meantime the mark-delete will be queued.
                 STATE_UPDATER.compareAndSet(ManagedCursorImpl.this, 
State.Open, State.NoLedger);
-                callback.operationFailed(createManagedLedgerException(rc));
+
+                // Before giving up, try to persist the position in the 
metadata store
+                persistPositionMetaStore(-1, position, mdEntry.properties, new 
MetaStoreCallback<Void>() {
+                    @Override
+                    public void operationComplete(Void result, Stat stat) {
+                        if (log.isDebugEnabled()) {
+                            log.debug(
+                                    "[{}][{}] Updated cursor in meta store 
after previous failure in ledger at position {}",
+                                    ledger.getName(), name, position);
+                        }
+                        callback.operationComplete();
+                    }
+
+                    @Override
+                    public void operationFailed(MetaStoreException e) {
+                        log.warn("[{}][{}] Failed to update cursor in meta 
store after previous failure in ledger: {}",
+                                ledger.getName(), name, e.getMessage());
+                        
callback.operationFailed(createManagedLedgerException(rc));
+                    }
+                }, true);
             }
         }, null);
     }
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 312b894..ad85a35 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -64,6 +64,7 @@ import org.apache.bookkeeper.mledger.impl.MetaStore.Stat;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.apache.zookeeper.KeeperException.Code;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.Test;
@@ -409,11 +410,29 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         stopBookKeeper();
         assertEquals(entries.size(), 1);
 
+        // Mark-delete should succeed if BK is down
+        cursor.markDelete(entries.get(0).getPosition());
+
+        entries.forEach(e -> e.release());
+    }
+
+    @Test(timeOut = 20000)
+    void markDeleteWithZKErrors() throws Exception {
+        ManagedLedger ledger = factory.open("my_test_ledger");
+        ManagedCursor cursor = ledger.openCursor("c1");
+        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
+        List<Entry> entries = cursor.readEntries(100);
+
+        assertEquals(entries.size(), 1);
+
+        stopBookKeeper();
+        stopZooKeeper();
+
         try {
             cursor.markDelete(entries.get(0).getPosition());
-            fail("call should have failed");
-        } catch (ManagedLedgerException e) {
-            // ok
+            fail("Should have failed");
+        } catch (Exception e) {
+            // Expected
         }
 
         entries.forEach(e -> e.release());
@@ -1022,6 +1041,7 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         ManagedLedger ledger = factory.open("my_test_ledger");
 
         bkc.failAfter(1, BKException.Code.NotEnoughBookiesException);
+        zkc.failNow(Code.SESSIONEXPIRED);
         try {
             ledger.openCursor("c1");
             fail("should have failed");
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
index 5004d28..4efeefb 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
@@ -486,6 +486,7 @@ public class ManagedLedgerErrorsTest extends 
MockedBookKeeperTestCase {
         Position position = ledger.addEntry("entry".getBytes());
 
         bkc.failNow(BKException.Code.BookieHandleNotAvailableException);
+        zkc.failNow(Code.CONNECTIONLOSS);
 
         try {
             cursor.markDelete(position);
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
index afc751c..ff99da3 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
@@ -252,12 +252,8 @@ public class NonDurableCursorTest extends 
MockedBookKeeperTestCase {
         stopBookKeeper();
         assertEquals(entries.size(), 1);
 
-        try {
-            cursor.markDelete(entries.get(0).getPosition());
-            fail("call should have failed");
-        } catch (ManagedLedgerException e) {
-            // ok
-        }
+        // Mark-delete should succeed if BK is down
+        cursor.markDelete(entries.get(0).getPosition());
 
         entries.forEach(e -> e.release());
     }

-- 
To stop receiving notification emails like this one, please contact
si...@apache.org.

Reply via email to