This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 92866a4  [MERGE YAHOO REPO] AsyncReadLastEntry should trigger callback 
with error when ledger is empty
92866a4 is described below

commit 92866a49de755b4dc1e171e25d56ad25aaaa90d0
Author: Jia Zhai <[email protected]>
AuthorDate: Sat Feb 17 00:39:52 2018 +0800

    [MERGE YAHOO REPO] AsyncReadLastEntry should trigger callback with error 
when ledger is empty
    
    Descriptions of the changes in this PR:
    This is cherry-pick from yahoo repo of branch yahoo-4.3.
    
    original change is:
    https://github.com/yahoo/bookkeeper/commit/9560ab0f
    AsyncReadLastEntry should trigger callback with error when ledger is empty
    
    Author: Jia Zhai <[email protected]>
    Author: Matteo Merli <[email protected]>
    Author: Matteo Merli <[email protected]>
    
    Reviewers: Enrico Olivelli <[email protected]>, Sijie Guo 
<[email protected]>
    
    This closes #1121 from jiazhai/cherry_picks/i_167
---
 .../org/apache/bookkeeper/client/LedgerHandle.java |  32 +++++
 .../bookkeeper/client/ReadOnlyLedgerHandle.java    |  23 ++++
 .../bookkeeper/client/TestReadLastEntry.java       | 141 +++++++++++++++++++++
 3 files changed, 196 insertions(+)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index dc100c6..007f4ce 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -787,6 +787,38 @@ public class LedgerHandle implements WriteHandle {
         }
     }
 
+    /*
+     * Read the last entry in the ledger
+     *
+     * @param cb
+     *            object implementing read callback interface
+     * @param ctx
+     *            control object
+     */
+    public void asyncReadLastEntry(ReadCallback cb, Object ctx) {
+        long lastEntryId = getLastAddConfirmed();
+        if (lastEntryId < 0) {
+            // Ledger was empty, so there is no last entry to read
+            cb.readComplete(BKException.Code.NoSuchEntryException, this, null, 
ctx);
+        } else {
+            asyncReadEntriesInternal(lastEntryId, lastEntryId, cb, ctx);
+        }
+    }
+
+    public LedgerEntry readLastEntry()
+        throws InterruptedException, BKException {
+        long lastEntryId = getLastAddConfirmed();
+        if (lastEntryId < 0) {
+            // Ledger was empty, so there is no last entry to read
+            throw new BKException.BKNoSuchEntryException();
+        } else {
+            CompletableFuture<Enumeration<LedgerEntry>> result = new 
CompletableFuture<>();
+            asyncReadEntries(lastEntryId, lastEntryId, new 
SyncReadCallback(result), null);
+
+            return SyncCallbackUtils.waitForResult(result).nextElement();
+        }
+    }
+
     CompletableFuture<LedgerEntries> readEntriesInternalAsync(long firstEntry,
                                                               long lastEntry) {
         PendingReadOp op = new PendingReadOp(this, bk.getScheduler(), 
firstEntry, lastEntry);
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
index d1ab615..55b7b5c 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
@@ -27,6 +27,8 @@ import java.util.concurrent.RejectedExecutionException;
 
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
+import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
+import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.api.WriteFlag;
 import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -178,4 +180,25 @@ class ReadOnlyLedgerHandle extends LedgerHandle implements 
LedgerMetadataListene
     protected void initializeExplicitLacFlushPolicy() {
         explicitLacFlushPolicy = 
ExplicitLacFlushPolicy.VOID_EXPLICITLAC_FLUSH_POLICY;
     }
+
+    @Override
+    public void asyncReadLastEntry(ReadCallback cb, Object ctx) {
+        asyncReadLastConfirmed(new ReadLastConfirmedCallback() {
+            @Override
+            public void readLastConfirmedComplete(int rc, long lastConfirmed, 
Object ctx) {
+                if (rc == BKException.Code.OK) {
+                    if (lastConfirmed < 0) {
+                        // Ledger was empty, so there is no last entry to read
+                        cb.readComplete(BKException.Code.NoSuchEntryException, 
ReadOnlyLedgerHandle.this, null, ctx);
+                    } else {
+                        asyncReadEntriesInternal(lastConfirmed, lastConfirmed, 
cb, ctx);
+                    }
+                } else {
+                    LOG.error("ReadException in asyncReadLastEntry, ledgerId: 
{}, lac: {}, rc:{}",
+                        lastConfirmed, ledgerId, rc);
+                    cb.readComplete(rc, ReadOnlyLedgerHandle.this, null, ctx);
+                }
+            }
+        }, ctx);
+    }
 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastEntry.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastEntry.java
new file mode 100644
index 0000000..35c0f3b
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastEntry.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
+import org.apache.bookkeeper.client.BKException.Code;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.Test;
+
+/**
+  * Test read next entry and the latest last add confirmed.
+  */
+public class TestReadLastEntry extends BookKeeperClusterTestCase {
+
+    final DigestType digestType;
+
+    public TestReadLastEntry() {
+        super(1);
+        this.digestType = DigestType.CRC32;
+    }
+
+    @Test
+    public void testTryReadLastEntryAsyncOnEmptyLedger() throws Exception {
+        final LedgerHandle lh = bkc.createLedger(1, 1, 1, digestType, 
"".getBytes());
+        lh.close();
+
+        LedgerHandle readLh = bkc.openLedger(lh.getId(), digestType, 
"".getBytes());
+
+        final CountDownLatch latch1 = new CountDownLatch(1);
+        final AtomicInteger rcStore = new AtomicInteger();
+        readLh.asyncReadLastEntry(new ReadCallback() {
+            @Override
+            public void readComplete(int rc, LedgerHandle lh, 
Enumeration<LedgerEntry> seq, Object ctx) {
+                rcStore.set(rc);
+                latch1.countDown();
+            }
+        }, null);
+
+        latch1.await();
+
+        assertEquals(BKException.Code.NoSuchEntryException, rcStore.get());
+
+        lh.close();
+        readLh.close();
+    }
+
+    @Test
+    public void testTryReadLastEntryOnEmptyLedger() throws Exception {
+        final LedgerHandle lh = bkc.createLedger(1, 1, 1, digestType, 
"".getBytes());
+        lh.close();
+
+        LedgerHandle readLh = bkc.openLedger(lh.getId(), digestType, 
"".getBytes());
+        try {
+            LedgerEntry lastEntry = readLh.readLastEntry();
+            fail("should fail with NoSuchEntryException");
+        } catch (BKException e) {
+            assertEquals(e.getCode(), Code.NoSuchEntryException);
+        }
+
+        lh.close();
+        readLh.close();
+    }
+
+    @Test
+    public void testTryReadLastEntryAsync() throws Exception {
+        final LedgerHandle lh = bkc.createLedger(1, 1, 1, digestType, 
"".getBytes());
+        byte data[] = new byte[1024];
+        Arrays.fill(data, (byte) 'x');
+        for (int j = 0; j < 100; j++) {
+            data[1023] = Integer.valueOf(j).byteValue();
+            lh.addEntry(data);
+        }
+        lh.close();
+
+        LedgerHandle readLh = bkc.openLedger(lh.getId(), digestType, 
"".getBytes());
+        final CountDownLatch latch1 = new CountDownLatch(1);
+        final AtomicInteger rcStore = new AtomicInteger();
+        final AtomicInteger lastByteStore = new AtomicInteger();
+
+        readLh.asyncReadLastEntry(new ReadCallback() {
+            @Override
+            public void readComplete(int rc, LedgerHandle lh, 
Enumeration<LedgerEntry> seq, Object ctx) {
+                rcStore.set(rc);
+                LedgerEntry entry = seq.nextElement();
+                lastByteStore.set(Integer.valueOf(entry.getEntry()[1023]));
+                latch1.countDown();
+            }
+        }, null);
+
+        latch1.await();
+
+        assertEquals(BKException.Code.OK, rcStore.get());
+        assertEquals(lastByteStore.byteValue(), data[1023]);
+
+        lh.close();
+        readLh.close();
+    }
+
+    @Test
+    public void testTryReadLastEntrySync() throws Exception {
+        final LedgerHandle lh = bkc.createLedger(1, 1, 1, digestType, 
"".getBytes());
+        byte data[] = new byte[1024];
+        Arrays.fill(data, (byte) 'x');
+        for (int j = 0; j < 100; j++) {
+            data[1023] = Integer.valueOf(j).byteValue();
+            lh.addEntry(data);
+        }
+        lh.close();
+
+        LedgerHandle readLh = bkc.openLedger(lh.getId(), digestType, 
"".getBytes());
+        LedgerEntry lastEntry = readLh.readLastEntry();
+
+        assertEquals(lastEntry.getEntry()[1023], 
Integer.valueOf(99).byteValue());
+
+        lh.close();
+        readLh.close();
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to