This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 770ddf0 Compaction sets read compacted flag when reading (#1540) 770ddf0 is described below commit 770ddf088c924f6e6c992ba583b4d7138186d223 Author: Ivan Kelly <iv...@apache.org> AuthorDate: Tue Apr 10 23:09:01 2018 +0200 Compaction sets read compacted flag when reading (#1540) Previously compaction would read the whole backlog again, even though what was in the compacted topic ledger was sufficient for its purposes. This patch flips the switch. --- .../apache/pulsar/client/impl/RawReaderImpl.java | 1 + .../broker/auth/MockedPulsarServiceBaseTest.java | 4 +- .../apache/pulsar/compaction/CompactionTest.java | 112 ++++++++++++++++++++- 3 files changed, 114 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java index 519b3b4..4a91477 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java @@ -54,6 +54,7 @@ public class RawReaderImpl implements RawReader { consumerConfiguration.setSubscriptionName(subscription); consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive); consumerConfiguration.setReceiverQueueSize(DEFAULT_RECEIVER_QUEUE_SIZE); + consumerConfiguration.setReadCompacted(true); consumer = new RawConsumerImpl(client, consumerConfiguration, consumerFuture); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index d3c2ea1..72b8b74 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -222,11 +222,11 @@ public abstract class MockedPulsarServiceBaseTest { } public static NonClosableMockBookKeeper createMockBookKeeper(ZooKeeper zookeeper) throws Exception { - return new NonClosableMockBookKeeper(new ClientConfiguration(), zookeeper); + return spy(new NonClosableMockBookKeeper(new ClientConfiguration(), zookeeper)); } // Prevent the MockBookKeeper instance from being closed when the broker is restarted within a test - private static class NonClosableMockBookKeeper extends MockBookKeeper { + public static class NonClosableMockBookKeeper extends MockBookKeeper { public NonClosableMockBookKeeper(ClientConfiguration conf, ZooKeeper zk) throws Exception { super(zk); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index 22e74f2..8f64ef5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -18,18 +18,26 @@ */ package org.apache.pulsar.compaction; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.client.api.OpenBuilder; +import org.apache.bookkeeper.mledger.ManagedLedgerInfo; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageBuilder; @@ -40,16 +48,20 @@ import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.PropertyAdmin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ThreadFactoryBuilder; public class CompactionTest extends MockedPulsarServiceBaseTest { + private static final Logger log = LoggerFactory.getLogger(CompactionTest.class); + private ScheduledExecutorService compactionScheduler; private BookKeeper bk; @@ -580,4 +592,102 @@ public class CompactionTest extends MockedPulsarServiceBaseTest { } } + + @Test + public void testCompactorReadsCompacted() throws Exception { + String topic = "persistent://my-property/use/my-ns/my-topic1"; + + // capture opened ledgers + Set<Long> ledgersOpened = Sets.newConcurrentHashSet(); + when(mockBookKeeper.newOpenLedgerOp()).thenAnswer( + (invocation) -> { + OpenBuilder builder = (OpenBuilder)spy(invocation.callRealMethod()); + when(builder.withLedgerId(anyLong())).thenAnswer( + (invocation2) -> { + ledgersOpened.add((Long)invocation2.getArguments()[0]); + return invocation2.callRealMethod(); + }); + return builder; + }); + + // subscribe before sending anything, so that we get all messages in sub1 + pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").subscribe().close(); + + // create the topic on the broker + try (Producer producerNormal = pulsarClient.newProducer().topic(topic).create()) { + producerNormal.send(MessageBuilder.create() + .setKey("key0") + .setContent("my-message-0".getBytes()).build()); + } + + // force ledger roll + pulsar.getBrokerService().getTopicReference(topic).get().close().get(); + + // write a message to avoid issue #1517 + try (Producer producerNormal = pulsarClient.newProducer().topic(topic).create()) { + producerNormal.send(MessageBuilder.create() + .setKey("key1") + .setContent("my-message-1".getBytes()).build()); + } + + // verify second ledger created + String managedLedgerName = ((PersistentTopic)pulsar.getBrokerService().getTopicReference(topic).get()) + .getManagedLedger().getName(); + ManagedLedgerInfo info = pulsar.getManagedLedgerFactory().getManagedLedgerInfo(managedLedgerName); + Assert.assertEquals(info.ledgers.size(), 2); + Assert.assertTrue(ledgersOpened.isEmpty()); // no ledgers should have been opened + + // compact the topic + Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topic).get(); + + // should have opened all except last to read + Assert.assertTrue(ledgersOpened.contains(info.ledgers.get(0).ledgerId)); + Assert.assertFalse(ledgersOpened.contains(info.ledgers.get(1).ledgerId)); + ledgersOpened.clear(); + + // force broker to close resources for topic + pulsar.getBrokerService().getTopicReference(topic).get().close().get(); + + // write a message to avoid issue #1517 + try (Producer producerNormal = pulsarClient.newProducer().topic(topic).create()) { + producerNormal.send(MessageBuilder.create() + .setKey("key2") + .setContent("my-message-2".getBytes()).build()); + } + + info = pulsar.getManagedLedgerFactory().getManagedLedgerInfo(managedLedgerName); + Assert.assertEquals(info.ledgers.size(), 3); + + // should only have opened the penultimate ledger to get stat + Assert.assertFalse(ledgersOpened.contains(info.ledgers.get(0).ledgerId)); + Assert.assertFalse(ledgersOpened.contains(info.ledgers.get(1).ledgerId)); + Assert.assertFalse(ledgersOpened.contains(info.ledgers.get(2).ledgerId)); + ledgersOpened.clear(); + + // compact the topic again + compactor.compact(topic).get(); + + // shouldn't have opened first ledger (already compacted), penultimate would have some uncompacted data. + // last ledger already open for writing + Assert.assertFalse(ledgersOpened.contains(info.ledgers.get(0).ledgerId)); + Assert.assertTrue(ledgersOpened.contains(info.ledgers.get(1).ledgerId)); + Assert.assertFalse(ledgersOpened.contains(info.ledgers.get(2).ledgerId)); + + // all three messages should be there when we read compacted + try (Consumer consumer = pulsarClient.newConsumer().topic(topic) + .subscriptionName("sub1").readCompacted(true).subscribe()){ + Message message1 = consumer.receive(); + Assert.assertEquals(message1.getKey(), "key0"); + Assert.assertEquals(new String(message1.getData()), "my-message-0"); + + Message message2 = consumer.receive(); + Assert.assertEquals(message2.getKey(), "key1"); + Assert.assertEquals(new String(message2.getData()), "my-message-1"); + + Message message3 = consumer.receive(); + Assert.assertEquals(message3.getKey(), "key2"); + Assert.assertEquals(new String(message3.getData()), "my-message-2"); + } + } } -- To stop receiving notification emails like this one, please contact mme...@apache.org.