merlimat closed pull request #1540: Compaction sets read compacted flag when reading URL: https://github.com/apache/incubator-pulsar/pull/1540
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java index 519b3b410c..4a914778fd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java @@ -54,6 +54,7 @@ public RawReaderImpl(PulsarClientImpl client, String topic, String subscription, consumerConfiguration.setSubscriptionName(subscription); consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive); consumerConfiguration.setReceiverQueueSize(DEFAULT_RECEIVER_QUEUE_SIZE); + consumerConfiguration.setReadCompacted(true); consumer = new RawConsumerImpl(client, consumerConfiguration, consumerFuture); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index d3c2ea14ac..72b8b747f9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -222,11 +222,11 @@ public static MockZooKeeper createMockZooKeeper() throws Exception { } public static NonClosableMockBookKeeper createMockBookKeeper(ZooKeeper zookeeper) throws Exception { - return new NonClosableMockBookKeeper(new ClientConfiguration(), zookeeper); + return spy(new NonClosableMockBookKeeper(new ClientConfiguration(), zookeeper)); } // Prevent the MockBookKeeper instance from being closed when the broker is restarted within a test - private static class NonClosableMockBookKeeper extends MockBookKeeper { + public static class NonClosableMockBookKeeper extends MockBookKeeper { public NonClosableMockBookKeeper(ClientConfiguration conf, ZooKeeper zk) throws Exception { super(zk); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index 22e74f21a9..8f64ef544d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -18,18 +18,26 @@ */ package org.apache.pulsar.compaction; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.client.api.OpenBuilder; +import org.apache.bookkeeper.mledger.ManagedLedgerInfo; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageBuilder; @@ -40,16 +48,20 @@ import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.PropertyAdmin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ThreadFactoryBuilder; public class CompactionTest extends MockedPulsarServiceBaseTest { + private static final Logger log = LoggerFactory.getLogger(CompactionTest.class); + private ScheduledExecutorService compactionScheduler; private BookKeeper bk; @@ -580,4 +592,102 @@ public void testEmptyPayloadDeletes() throws Exception { } } + + @Test + public void testCompactorReadsCompacted() throws Exception { + String topic = "persistent://my-property/use/my-ns/my-topic1"; + + // capture opened ledgers + Set<Long> ledgersOpened = Sets.newConcurrentHashSet(); + when(mockBookKeeper.newOpenLedgerOp()).thenAnswer( + (invocation) -> { + OpenBuilder builder = (OpenBuilder)spy(invocation.callRealMethod()); + when(builder.withLedgerId(anyLong())).thenAnswer( + (invocation2) -> { + ledgersOpened.add((Long)invocation2.getArguments()[0]); + return invocation2.callRealMethod(); + }); + return builder; + }); + + // subscribe before sending anything, so that we get all messages in sub1 + pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").subscribe().close(); + + // create the topic on the broker + try (Producer producerNormal = pulsarClient.newProducer().topic(topic).create()) { + producerNormal.send(MessageBuilder.create() + .setKey("key0") + .setContent("my-message-0".getBytes()).build()); + } + + // force ledger roll + pulsar.getBrokerService().getTopicReference(topic).get().close().get(); + + // write a message to avoid issue #1517 + try (Producer producerNormal = pulsarClient.newProducer().topic(topic).create()) { + producerNormal.send(MessageBuilder.create() + .setKey("key1") + .setContent("my-message-1".getBytes()).build()); + } + + // verify second ledger created + String managedLedgerName = ((PersistentTopic)pulsar.getBrokerService().getTopicReference(topic).get()) + .getManagedLedger().getName(); + ManagedLedgerInfo info = pulsar.getManagedLedgerFactory().getManagedLedgerInfo(managedLedgerName); + Assert.assertEquals(info.ledgers.size(), 2); + Assert.assertTrue(ledgersOpened.isEmpty()); // no ledgers should have been opened + + // compact the topic + Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topic).get(); + + // should have opened all except last to read + Assert.assertTrue(ledgersOpened.contains(info.ledgers.get(0).ledgerId)); + Assert.assertFalse(ledgersOpened.contains(info.ledgers.get(1).ledgerId)); + ledgersOpened.clear(); + + // force broker to close resources for topic + pulsar.getBrokerService().getTopicReference(topic).get().close().get(); + + // write a message to avoid issue #1517 + try (Producer producerNormal = pulsarClient.newProducer().topic(topic).create()) { + producerNormal.send(MessageBuilder.create() + .setKey("key2") + .setContent("my-message-2".getBytes()).build()); + } + + info = pulsar.getManagedLedgerFactory().getManagedLedgerInfo(managedLedgerName); + Assert.assertEquals(info.ledgers.size(), 3); + + // should only have opened the penultimate ledger to get stat + Assert.assertFalse(ledgersOpened.contains(info.ledgers.get(0).ledgerId)); + Assert.assertFalse(ledgersOpened.contains(info.ledgers.get(1).ledgerId)); + Assert.assertFalse(ledgersOpened.contains(info.ledgers.get(2).ledgerId)); + ledgersOpened.clear(); + + // compact the topic again + compactor.compact(topic).get(); + + // shouldn't have opened first ledger (already compacted), penultimate would have some uncompacted data. + // last ledger already open for writing + Assert.assertFalse(ledgersOpened.contains(info.ledgers.get(0).ledgerId)); + Assert.assertTrue(ledgersOpened.contains(info.ledgers.get(1).ledgerId)); + Assert.assertFalse(ledgersOpened.contains(info.ledgers.get(2).ledgerId)); + + // all three messages should be there when we read compacted + try (Consumer consumer = pulsarClient.newConsumer().topic(topic) + .subscriptionName("sub1").readCompacted(true).subscribe()){ + Message message1 = consumer.receive(); + Assert.assertEquals(message1.getKey(), "key0"); + Assert.assertEquals(new String(message1.getData()), "my-message-0"); + + Message message2 = consumer.receive(); + Assert.assertEquals(message2.getKey(), "key1"); + Assert.assertEquals(new String(message2.getData()), "my-message-1"); + + Message message3 = consumer.receive(); + Assert.assertEquals(message3.getKey(), "key2"); + Assert.assertEquals(new String(message3.getData()), "my-message-2"); + } + } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services