This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new fbfd944  Algorithm to find start point of compacted ledger (#1205)
fbfd944 is described below

commit fbfd9445ee468faae7f56aeab69e93794bf4a033
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Mon Feb 12 23:14:59 2018 +0100

    Algorithm to find start point of compacted ledger (#1205)
    
    * Algorithm to find start point of compacted ledger
    
    When reading from a compacted topic ledger, the reader will have to find
    the the first entry whole message id is greater than or equal to the
    position of the cursor. This involves a binary search into the ledger.
    
    This patch implements this binary search, along with basic caching to
    avoid rereading the same entry multiple times.
    
    * Guava cache -> Caffeine
    
    * rejig future callbacks
---
 .../apache/pulsar/client/impl/RawMessageImpl.java  |   2 +-
 .../pulsar/compaction/CompactedTopicImpl.java      |  90 +++++++++
 .../pulsar/compaction/CompactedTopicTest.java      | 214 +++++++++++++++++++++
 3 files changed, 305 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawMessageImpl.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawMessageImpl.java
index d1b6e14..d36520c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawMessageImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawMessageImpl.java
@@ -38,7 +38,7 @@ public class RawMessageImpl implements RawMessage {
     private final MessageIdData id;
     private final ByteBuf headersAndPayload;
 
-    RawMessageImpl(MessageIdData id, ByteBuf headersAndPayload) {
+    public RawMessageImpl(MessageIdData id, ByteBuf headersAndPayload) {
         this.id = id;
         this.headersAndPayload = headersAndPayload.retainedSlice();
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index 068ab49..0afccd1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -18,10 +18,100 @@
  */
 package org.apache.pulsar.compaction;
 
+import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.google.common.collect.ComparisonChain;
+
+import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.client.api.RawMessage;
+import org.apache.pulsar.client.impl.RawMessageImpl;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class CompactedTopicImpl implements CompactedTopic {
+    final static long NEWER_THAN_COMPACTED = -0xfeed0fbaL;
+
     @Override
     public void newCompactedLedger(Position p, long compactedLedgerId) {}
+
+    static CompletableFuture<Long> findStartPoint(PositionImpl p,
+                                                  long lastEntryId,
+                                                  
AsyncLoadingCache<Long,MessageIdData> cache) {
+        CompletableFuture<Long> promise = new CompletableFuture<>();
+        findStartPointLoop(p, 0, lastEntryId, promise, cache);
+        return promise;
+    }
+
+    private static void findStartPointLoop(PositionImpl p, long start, long 
end,
+                                           CompletableFuture<Long> promise,
+                                           
AsyncLoadingCache<Long,MessageIdData> cache) {
+        long midpoint = start + ((end - start) / 2);
+
+        CompletableFuture<MessageIdData> startEntry = cache.get(start);
+        CompletableFuture<MessageIdData> middleEntry = cache.get(midpoint);
+        CompletableFuture<MessageIdData> endEntry = cache.get(end);
+
+        CompletableFuture.allOf(startEntry, middleEntry, endEntry).thenRun(
+                () -> {
+                    if (comparePositionAndMessageId(p, startEntry.join()) < 0) 
{
+                        promise.complete(start);
+                    } else if (comparePositionAndMessageId(p, 
middleEntry.join()) < 0) {
+                        findStartPointLoop(p, start, midpoint, promise, cache);
+                    } else if (comparePositionAndMessageId(p, endEntry.join()) 
< 0) {
+                        findStartPointLoop(p, midpoint + 1, end, promise, 
cache);
+                    } else {
+                        promise.complete(NEWER_THAN_COMPACTED);
+                    }
+                }).exceptionally((exception) -> {
+                        promise.completeExceptionally(exception);
+                        return null;
+                    });
+    }
+
+    static AsyncLoadingCache<Long,MessageIdData> createCache(LedgerHandle lh,
+                                                             long maxSize) {
+        return Caffeine.newBuilder()
+            .maximumSize(maxSize)
+            .buildAsync((entryId, executor) -> readOneMessageId(lh, entryId));
+    }
+
+
+    private static CompletableFuture<MessageIdData> 
readOneMessageId(LedgerHandle lh, long entryId) {
+        CompletableFuture<MessageIdData> promise = new CompletableFuture<>();
+
+        lh.asyncReadEntries(entryId, entryId,
+                            (rc, _lh, seq, ctx) -> {
+                                if (rc != BKException.Code.OK) {
+                                    
promise.completeExceptionally(BKException.create(rc));
+                                } else {
+                                    try (RawMessage m = 
RawMessageImpl.deserializeFrom(
+                                                 
seq.nextElement().getEntryBuffer())) {
+                                        promise.complete(m.getMessageIdData());
+                                    } catch (NoSuchElementException e) {
+                                        log.error("No such entry {} in ledger 
{}", entryId, lh.getId());
+                                        promise.completeExceptionally(e);
+                                    }
+                                }
+                            }, null);
+        return promise;
+    }
+
+    private static int comparePositionAndMessageId(PositionImpl p, 
MessageIdData m) {
+        return ComparisonChain.start()
+            .compare(p.getLedgerId(), m.getLedgerId())
+            .compare(p.getEntryId(), m.getEntryId()).result();
+    }
+    private static final Logger log = 
LoggerFactory.getLogger(CompactedTopicImpl.class);
 }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
new file mode 100644
index 0000000..04d4bbc
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import static org.apache.pulsar.client.impl.RawReaderTest.extractKey;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.lang3.tuple.Triple;
+import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import 
org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.common.api.Commands;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.PropertyAdmin;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.RawMessage;
+import org.apache.pulsar.client.impl.RawMessageImpl;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class CompactedTopicTest extends MockedPulsarServiceBaseTest {
+    private static final Logger log = 
LoggerFactory.getLogger(CompactedTopicTest.class);
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        super.internalSetup();
+
+        admin.clusters().createCluster("use",
+                new ClusterData("http://127.0.0.1:"; + BROKER_WEBSERVICE_PORT));
+        admin.properties().createProperty("my-property",
+                new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), 
Sets.newHashSet("use")));
+        admin.namespaces().createNamespace("my-property/use/my-ns");
+    }
+
+    @AfterMethod
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    /**
+     * Build a compacted ledger, and return the id of the ledger, the position 
of the different
+     * entries in the ledger, and a list of gaps, and the entry which should 
be returned after the gap.
+     */
+    private Triple<Long, List<Pair<MessageIdData,Long>>, 
List<Pair<MessageIdData,Long>>>
+        buildCompactedLedger(BookKeeper bk, int seed, int count)
+            throws Exception {
+        Random r = new Random(seed);
+        LedgerHandle lh = bk.createLedger(1, 1,
+                                          
Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
+                                          
Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
+        List<Pair<MessageIdData,Long>> positions = new ArrayList<>();
+        List<Pair<MessageIdData,Long>> idsInGaps = new ArrayList<>();
+        ByteBuf emptyBuffer = Unpooled.buffer(0);
+
+        AtomicLong ledgerIds = new AtomicLong(10L);
+        AtomicLong entryIds = new AtomicLong(0L);
+        CompletableFuture.allOf(
+                IntStream.range(0, count).mapToObj((i) -> {
+                        List<MessageIdData> idsInGap = new 
ArrayList<MessageIdData>();
+                        if (r.nextInt(10) == 1) {
+                            long delta = r.nextInt(10) + 1;
+                            idsInGap.add(MessageIdData.newBuilder()
+                                         .setLedgerId(ledgerIds.get())
+                                         .setEntryId(entryIds.get() + 1)
+                                         .build());
+                            ledgerIds.addAndGet(delta);
+                            entryIds.set(0);
+                        }
+                        long delta = r.nextInt(5);
+                        if (delta != 0) {
+                            idsInGap.add(MessageIdData.newBuilder()
+                                         .setLedgerId(ledgerIds.get())
+                                         .setEntryId(entryIds.get() + 1)
+                                         .build());
+                        }
+                        MessageIdData id = MessageIdData.newBuilder()
+                            .setLedgerId(ledgerIds.get())
+                            .setEntryId(entryIds.addAndGet(delta + 1)).build();
+                        RawMessage m = new RawMessageImpl(id, emptyBuffer);
+
+                        CompletableFuture<Void> f = new CompletableFuture<>();
+                        lh.asyncAddEntry(m.serialize(),
+                                (rc, ledger, eid, ctx) -> {
+                                     if (rc != BKException.Code.OK) {
+                                         
f.completeExceptionally(BKException.create(rc));
+                                     } else {
+                                         positions.add(Pair.of(id, eid));
+                                         idsInGap.forEach((gid) -> 
idsInGaps.add(Pair.of(gid, eid)));
+                                         f.complete(null);
+                                     }
+                                }, null);
+                        return f;
+                    }).toArray(CompletableFuture[]::new)).get();
+        lh.close();
+
+        return Triple.of(lh.getId(), positions, idsInGaps);
+    }
+
+    @Test
+    public void testEntryLookup() throws Exception {
+        BookKeeper bk = pulsar.getBookKeeperClientFactory().create(
+                this.conf, null);
+
+        Triple<Long, List<Pair<MessageIdData, Long>>, List<Pair<MessageIdData, 
Long>>> compactedLedgerData
+            = buildCompactedLedger(bk, 0, 500);
+
+        List<Pair<MessageIdData, Long>> positions = 
compactedLedgerData.getMiddle();
+        List<Pair<MessageIdData, Long>> idsInGaps = 
compactedLedgerData.getRight();
+
+        LedgerHandle lh = bk.openLedger(compactedLedgerData.getLeft(),
+                                        
Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
+                                        
Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
+        long lastEntryId = lh.getLastAddConfirmed();
+        AsyncLoadingCache<Long,MessageIdData> cache = 
CompactedTopicImpl.createCache(lh, 50);
+
+        MessageIdData firstPositionId = positions.get(0).getLeft();
+        Pair<MessageIdData, Long> lastPosition = 
positions.get(positions.size() - 1);
+
+        // check ids before and after ids in compacted ledger
+        Assert.assertEquals(CompactedTopicImpl.findStartPoint(new 
PositionImpl(0, 0), lastEntryId, cache).get(),
+                            Long.valueOf(0));
+        Assert.assertEquals(CompactedTopicImpl.findStartPoint(new 
PositionImpl(Long.MAX_VALUE, 0),
+                                                              lastEntryId, 
cache).get(),
+                            
Long.valueOf(CompactedTopicImpl.NEWER_THAN_COMPACTED));
+
+        // entry 0 is never in compacted ledger due to how we generate dummy
+        Assert.assertEquals(CompactedTopicImpl.findStartPoint(new 
PositionImpl(firstPositionId.getLedgerId(), 0),
+                                                              lastEntryId, 
cache).get(),
+                            Long.valueOf(0));
+        // check next id after last id in compacted ledger
+        Assert.assertEquals(CompactedTopicImpl.findStartPoint(new 
PositionImpl(lastPosition.getLeft().getLedgerId(),
+                                                                               
lastPosition.getLeft().getEntryId() + 1),
+                                                              lastEntryId, 
cache).get(),
+                            
Long.valueOf(CompactedTopicImpl.NEWER_THAN_COMPACTED));
+
+        // shuffle to make cache work hard
+        Collections.shuffle(positions);
+        Collections.shuffle(idsInGaps);
+
+        // Check ids we know are in compacted ledger
+        for (Pair<MessageIdData, Long> p : positions) {
+            PositionImpl pos = new PositionImpl(p.getLeft().getLedgerId(), 
p.getLeft().getEntryId());
+            if (p.equals(lastPosition)) {
+                Assert.assertEquals(CompactedTopicImpl.findStartPoint(pos, 
lastEntryId, cache).get(),
+                                    
Long.valueOf(CompactedTopicImpl.NEWER_THAN_COMPACTED));
+            } else {
+                Assert.assertEquals(CompactedTopicImpl.findStartPoint(pos, 
lastEntryId, cache).get(),
+                            Long.valueOf(p.getRight() + 1));
+            }
+        }
+
+        // Check ids we know are in the gaps of the compacted ledger
+        for (Pair<MessageIdData, Long> gap : idsInGaps) {
+            PositionImpl pos = new PositionImpl(gap.getLeft().getLedgerId(), 
gap.getLeft().getEntryId());
+            Assert.assertEquals(CompactedTopicImpl.findStartPoint(pos, 
lastEntryId, cache).get(),
+                                Long.valueOf(gap.getRight()));
+        }
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to