This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 770ddf0  Compaction sets read compacted flag when reading (#1540)
770ddf0 is described below

commit 770ddf088c924f6e6c992ba583b4d7138186d223
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Tue Apr 10 23:09:01 2018 +0200

    Compaction sets read compacted flag when reading (#1540)
    
    Previously compaction would read the whole backlog again, even though
    what was in the compacted topic ledger was sufficient for its
    purposes. This patch flips the switch.
---
 .../apache/pulsar/client/impl/RawReaderImpl.java   |   1 +
 .../broker/auth/MockedPulsarServiceBaseTest.java   |   4 +-
 .../apache/pulsar/compaction/CompactionTest.java   | 112 ++++++++++++++++++++-
 3 files changed, 114 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 519b3b4..4a91477 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -54,6 +54,7 @@ public class RawReaderImpl implements RawReader {
         consumerConfiguration.setSubscriptionName(subscription);
         consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
         
consumerConfiguration.setReceiverQueueSize(DEFAULT_RECEIVER_QUEUE_SIZE);
+        consumerConfiguration.setReadCompacted(true);
 
         consumer = new RawConsumerImpl(client, consumerConfiguration,
                                        consumerFuture);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index d3c2ea1..72b8b74 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -222,11 +222,11 @@ public abstract class MockedPulsarServiceBaseTest {
     }
 
     public static NonClosableMockBookKeeper createMockBookKeeper(ZooKeeper 
zookeeper) throws Exception {
-        return new NonClosableMockBookKeeper(new ClientConfiguration(), 
zookeeper);
+        return spy(new NonClosableMockBookKeeper(new ClientConfiguration(), 
zookeeper));
     }
 
     // Prevent the MockBookKeeper instance from being closed when the broker 
is restarted within a test
-    private static class NonClosableMockBookKeeper extends MockBookKeeper {
+    public static class NonClosableMockBookKeeper extends MockBookKeeper {
 
         public NonClosableMockBookKeeper(ClientConfiguration conf, ZooKeeper 
zk) throws Exception {
             super(zk);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 22e74f2..8f64ef5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -18,18 +18,26 @@
  */
 package org.apache.pulsar.compaction;
 
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.api.OpenBuilder;
+import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageBuilder;
@@ -40,16 +48,20 @@ import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.PropertyAdmin;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 public class CompactionTest extends MockedPulsarServiceBaseTest {
+    private static final Logger log = 
LoggerFactory.getLogger(CompactionTest.class);
+
     private ScheduledExecutorService compactionScheduler;
     private BookKeeper bk;
 
@@ -580,4 +592,102 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         }
     }
 
+
+    @Test
+    public void testCompactorReadsCompacted() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+        // capture opened ledgers
+        Set<Long> ledgersOpened = Sets.newConcurrentHashSet();
+        when(mockBookKeeper.newOpenLedgerOp()).thenAnswer(
+                (invocation) -> {
+                    OpenBuilder builder = 
(OpenBuilder)spy(invocation.callRealMethod());
+                    when(builder.withLedgerId(anyLong())).thenAnswer(
+                            (invocation2) -> {
+                                
ledgersOpened.add((Long)invocation2.getArguments()[0]);
+                                return invocation2.callRealMethod();
+                            });
+                    return builder;
+                });
+
+        // subscribe before sending anything, so that we get all messages in 
sub1
+        
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").subscribe().close();
+
+        // create the topic on the broker
+        try (Producer producerNormal = 
pulsarClient.newProducer().topic(topic).create()) {
+            producerNormal.send(MessageBuilder.create()
+                                .setKey("key0")
+                                
.setContent("my-message-0".getBytes()).build());
+        }
+
+        // force ledger roll
+        pulsar.getBrokerService().getTopicReference(topic).get().close().get();
+
+        // write a message to avoid issue #1517
+        try (Producer producerNormal = 
pulsarClient.newProducer().topic(topic).create()) {
+            producerNormal.send(MessageBuilder.create()
+                                .setKey("key1")
+                                
.setContent("my-message-1".getBytes()).build());
+        }
+
+        // verify second ledger created
+        String managedLedgerName = 
((PersistentTopic)pulsar.getBrokerService().getTopicReference(topic).get())
+            .getManagedLedger().getName();
+        ManagedLedgerInfo info = 
pulsar.getManagedLedgerFactory().getManagedLedgerInfo(managedLedgerName);
+        Assert.assertEquals(info.ledgers.size(), 2);
+        Assert.assertTrue(ledgersOpened.isEmpty()); // no ledgers should have 
been opened
+
+        // compact the topic
+        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
+        compactor.compact(topic).get();
+
+        // should have opened all except last to read
+        
Assert.assertTrue(ledgersOpened.contains(info.ledgers.get(0).ledgerId));
+        
Assert.assertFalse(ledgersOpened.contains(info.ledgers.get(1).ledgerId));
+        ledgersOpened.clear();
+
+        // force broker to close resources for topic
+        pulsar.getBrokerService().getTopicReference(topic).get().close().get();
+
+        // write a message to avoid issue #1517
+        try (Producer producerNormal = 
pulsarClient.newProducer().topic(topic).create()) {
+            producerNormal.send(MessageBuilder.create()
+                                .setKey("key2")
+                                
.setContent("my-message-2".getBytes()).build());
+        }
+
+        info = 
pulsar.getManagedLedgerFactory().getManagedLedgerInfo(managedLedgerName);
+        Assert.assertEquals(info.ledgers.size(), 3);
+
+        // should only have opened the penultimate ledger to get stat
+        
Assert.assertFalse(ledgersOpened.contains(info.ledgers.get(0).ledgerId));
+        
Assert.assertFalse(ledgersOpened.contains(info.ledgers.get(1).ledgerId));
+        
Assert.assertFalse(ledgersOpened.contains(info.ledgers.get(2).ledgerId));
+        ledgersOpened.clear();
+
+        // compact the topic again
+        compactor.compact(topic).get();
+
+        // shouldn't have opened first ledger (already compacted), penultimate 
would have some uncompacted data.
+        // last ledger already open for writing
+        
Assert.assertFalse(ledgersOpened.contains(info.ledgers.get(0).ledgerId));
+        
Assert.assertTrue(ledgersOpened.contains(info.ledgers.get(1).ledgerId));
+        
Assert.assertFalse(ledgersOpened.contains(info.ledgers.get(2).ledgerId));
+
+        // all three messages should be there when we read compacted
+        try (Consumer consumer = pulsarClient.newConsumer().topic(topic)
+                .subscriptionName("sub1").readCompacted(true).subscribe()){
+            Message message1 = consumer.receive();
+            Assert.assertEquals(message1.getKey(), "key0");
+            Assert.assertEquals(new String(message1.getData()), 
"my-message-0");
+
+            Message message2 = consumer.receive();
+            Assert.assertEquals(message2.getKey(), "key1");
+            Assert.assertEquals(new String(message2.getData()), 
"my-message-1");
+
+            Message message3 = consumer.receive();
+            Assert.assertEquals(message3.getKey(), "key2");
+            Assert.assertEquals(new String(message3.getData()), 
"my-message-2");
+        }
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to