This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new da64d0c Compaction uses GetLastMessageId (#1286) da64d0c is described below commit da64d0cdfbd320c2364d37b4d37d918889cda8e7 Author: Ivan Kelly <iv...@apache.org> AuthorDate: Mon Feb 26 21:42:55 2018 +0100 Compaction uses GetLastMessageId (#1286) In two phase compaction, we read from the start of the topic to the end. Previously, we had no reliable way to knowing if we were at the end of the topic, so we used timeouts. However, with the new GetLastMessageId api, we can find how far we should read in a topic, and just stop when we get there. --- .../org/apache/pulsar/client/api/RawReader.java | 6 ++ .../apache/pulsar/client/impl/RawReaderImpl.java | 5 + .../pulsar/compaction/TwoPhaseCompactor.java | 45 ++++---- .../apache/pulsar/client/impl/RawReaderTest.java | 114 ++++++++++----------- .../apache/pulsar/client/impl/ConsumerImpl.java | 2 +- 5 files changed, 92 insertions(+), 80 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java index b5a4a65..15b03be 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java @@ -20,6 +20,7 @@ package org.apache.pulsar.client.api; import java.util.Map; import java.util.concurrent.CompletableFuture; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.RawReaderImpl; @@ -61,6 +62,11 @@ public interface RawReader { CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId, Map<String,Long> properties); /** + * Get the last message id available immediately available for reading + */ + CompletableFuture<MessageId> getLastMessageIdAsync(); + + /** * Close the raw reader. */ CompletableFuture<Void> closeAsync(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java index 04aa610..a048a5b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java @@ -78,6 +78,11 @@ public class RawReaderImpl implements RawReader { return consumer.closeAsync(); } + @Override + public CompletableFuture<MessageId> getLastMessageIdAsync() { + return consumer.getLastMessageIdAsync(); + } + static class RawConsumerImpl extends ConsumerImpl { final BlockingQueue<RawMessageAndCnx> incomingRawMessages; final Queue<CompletableFuture<RawMessage>> pendingRawReceives; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java index 957e197..10dc8b1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java @@ -75,15 +75,22 @@ public class TwoPhaseCompactor extends Compactor { private CompletableFuture<PhaseOneResult> phaseOne(RawReader reader) { Map<String,MessageId> latestForKey = new HashMap<>(); - CompletableFuture<PhaseOneResult> loopPromise = new CompletableFuture<>(); - phaseOneLoop(reader, Optional.empty(), Optional.empty(), latestForKey, loopPromise); + + reader.getLastMessageIdAsync().whenComplete( + (lastMessageId, exception) -> { + if (exception != null) { + loopPromise.completeExceptionally(exception); + } else { + phaseOneLoop(reader, Optional.empty(), lastMessageId, latestForKey, loopPromise); + } + }); return loopPromise; } private void phaseOneLoop(RawReader reader, Optional<MessageId> firstMessageId, - Optional<MessageId> lastMessageId, + MessageId lastMessageId, Map<String,MessageId> latestForKey, CompletableFuture<PhaseOneResult> loopPromise) { if (loopPromise.isDone()) { @@ -91,34 +98,30 @@ public class TwoPhaseCompactor extends Compactor { } CompletableFuture<RawMessage> future = reader.readNextAsync(); scheduleTimeout(future); - future.whenComplete( + future.whenCompleteAsync( (m, exception) -> { try { if (exception != null) { - if (exception instanceof TimeoutException - && firstMessageId.isPresent()) { - loopPromise.complete(new PhaseOneResult(firstMessageId.get(), - lastMessageId.get(), - latestForKey)); - } else { - loopPromise.completeExceptionally(exception); - } + loopPromise.completeExceptionally(exception); return; } - MessageId id = m.getMessageId(); String key = extractKey(m); latestForKey.put(key, id); - phaseOneLoop(reader, - Optional.of(firstMessageId.orElse(id)), - Optional.of(id), - latestForKey, loopPromise); + if (id.compareTo(lastMessageId) == 0) { + loopPromise.complete(new PhaseOneResult(firstMessageId.orElse(id), + id, latestForKey)); + } else { + phaseOneLoop(reader, + Optional.of(firstMessageId.orElse(id)), + lastMessageId, + latestForKey, loopPromise); + } } finally { m.close(); } - }); - + }, scheduler); } private void scheduleTimeout(CompletableFuture<RawMessage> future) { @@ -168,7 +171,7 @@ public class TwoPhaseCompactor extends Compactor { private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId> latestForKey, LedgerHandle lh, Semaphore outstanding, CompletableFuture<Void> promise) { - reader.readNextAsync().whenComplete( + reader.readNextAsync().whenCompleteAsync( (m, exception) -> { try { if (exception != null) { @@ -205,7 +208,7 @@ public class TwoPhaseCompactor extends Compactor { } finally { m.close(); } - }); + }, scheduler); } private CompletableFuture<LedgerHandle> createLedger(BookKeeper bk) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java index 19cdc31..53da2ee 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java @@ -131,16 +131,16 @@ public class RawReaderTest extends MockedPulsarServiceBaseTest { Set<String> keys = publishMessages(topic, numKeys); RawReader reader = RawReader.create(pulsarClient, topic, subscription).get(); - try { - while (true) { // should break out with TimeoutException - try (RawMessage m = reader.readNextAsync().get(1, TimeUnit.SECONDS)) { - Assert.assertTrue(keys.remove(extractKey(m))); + + MessageId lastMessageId = reader.getLastMessageIdAsync().get(); + while (true) { + try (RawMessage m = reader.readNextAsync().get()) { + Assert.assertTrue(keys.remove(extractKey(m))); + if (lastMessageId.compareTo(m.getMessageId()) == 0) { + break; } } - } catch (TimeoutException te) { - // ok } - Assert.assertTrue(keys.isEmpty()); } @@ -153,28 +153,27 @@ public class RawReaderTest extends MockedPulsarServiceBaseTest { Set<String> readKeys = new HashSet<>(); RawReader reader = RawReader.create(pulsarClient, topic, subscription).get(); - try { - while (true) { // should break out with TimeoutException - try (RawMessage m = reader.readNextAsync().get(1, TimeUnit.SECONDS)) { - readKeys.add(extractKey(m)); + MessageId lastMessageId = reader.getLastMessageIdAsync().get(); + while (true) { + try (RawMessage m = reader.readNextAsync().get()) { + readKeys.add(extractKey(m)); + if (lastMessageId.compareTo(m.getMessageId()) == 0) { + break; } } - } catch (TimeoutException te) { - // ok } Assert.assertEquals(readKeys.size(), numKeys); // seek to start, read all keys again, // assert that we read all keys we had read previously reader.seekAsync(MessageId.earliest).get(); - try { - while (true) { // should break out with TimeoutException - try (RawMessage m = reader.readNextAsync().get(1, TimeUnit.SECONDS)) { - Assert.assertTrue(readKeys.remove(extractKey(m))); + while (true) { + try (RawMessage m = reader.readNextAsync().get()) { + Assert.assertTrue(readKeys.remove(extractKey(m))); + if (lastMessageId.compareTo(m.getMessageId()) == 0) { + break; } } - } catch (TimeoutException te) { - // ok } Assert.assertTrue(readKeys.isEmpty()); } @@ -190,34 +189,34 @@ public class RawReaderTest extends MockedPulsarServiceBaseTest { RawReader reader = RawReader.create(pulsarClient, topic, subscription).get(); int i = 0; MessageId seekTo = null; - try { - while (true) { // should break out with TimeoutException - try (RawMessage m = reader.readNextAsync().get(1, TimeUnit.SECONDS)) { - i++; - if (i > numKeys/2) { - if (seekTo == null) { - seekTo = m.getMessageId(); - } - readKeys.add(extractKey(m)); + MessageId lastMessageId = reader.getLastMessageIdAsync().get(); + + while (true) { + try (RawMessage m = reader.readNextAsync().get()) { + i++; + if (i > numKeys/2) { + if (seekTo == null) { + seekTo = m.getMessageId(); } + readKeys.add(extractKey(m)); + } + if (lastMessageId.compareTo(m.getMessageId()) == 0) { + break; } } - } catch (TimeoutException te) { - // ok } Assert.assertEquals(readKeys.size(), numKeys/2); // seek to middle, read all keys again, // assert that we read all keys we had read previously reader.seekAsync(seekTo).get(); - try { - while (true) { // should break out with TimeoutException - try (RawMessage m = reader.readNextAsync().get(1, TimeUnit.SECONDS)) { - Assert.assertTrue(readKeys.remove(extractKey(m))); + while (true) { // should break out with TimeoutException + try (RawMessage m = reader.readNextAsync().get()) { + Assert.assertTrue(readKeys.remove(extractKey(m))); + if (lastMessageId.compareTo(m.getMessageId()) == 0) { + break; } } - } catch (TimeoutException te) { - // ok } Assert.assertTrue(readKeys.isEmpty()); } @@ -280,18 +279,18 @@ public class RawReaderTest extends MockedPulsarServiceBaseTest { } } }; - try { - while (true) { // should break out with TimeoutException - try (RawMessage m = reader.readNextAsync().get(1, TimeUnit.SECONDS)) { - if (RawBatchConverter.isBatch(m)) { - RawBatchConverter.explodeBatch(m).forEach(consumer); - } else { - consumer.accept(m); - } + MessageId lastMessageId = reader.getLastMessageIdAsync().get(); + while (true) { + try (RawMessage m = reader.readNextAsync().get()) { + if (RawBatchConverter.isBatch(m)) { + RawBatchConverter.explodeBatch(m).forEach(consumer); + } else { + consumer.accept(m); + } + if (lastMessageId.compareTo(m.getMessageId()) == 0) { + break; } } - } catch (TimeoutException te) { - // ok } Assert.assertTrue(keys.isEmpty()); } @@ -304,24 +303,23 @@ public class RawReaderTest extends MockedPulsarServiceBaseTest { Set<String> keys = publishMessages(topic, numKeys); - MessageId lastMessageId = null; RawReader reader = RawReader.create(pulsarClient, topic, subscription).get(); - try { - while (true) { // should break out with TimeoutException - try (RawMessage m = reader.readNextAsync().get(1, TimeUnit.SECONDS)) { - lastMessageId = m.getMessageId(); - Assert.assertTrue(keys.remove(extractKey(m))); + MessageId lastMessageId = reader.getLastMessageIdAsync().get(); + + while (true) { + try (RawMessage m = reader.readNextAsync().get()) { + Assert.assertTrue(keys.remove(extractKey(m))); + + if (lastMessageId.compareTo(m.getMessageId()) == 0) { + break; } } - } catch (TimeoutException te) { - // ok } - Assert.assertTrue(keys.isEmpty()); Map<String,Long> properties = new HashMap<>(); properties.put("foobar", 0xdeadbeefdecaL); - reader.acknowledgeCumulativeAsync(lastMessageId, properties).get(5, TimeUnit.SECONDS); + reader.acknowledgeCumulativeAsync(lastMessageId, properties).get(); PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic); ManagedLedger ledger = topicRef.getManagedLedger(); @@ -349,12 +347,12 @@ public class RawReaderTest extends MockedPulsarServiceBaseTest { } for (int i = 0; i < numKeys/2; i++) { - futures.remove(0).get(5, TimeUnit.SECONDS); // complete successfully + futures.remove(0).get(); // complete successfully } reader.closeAsync().get(); while (!futures.isEmpty()) { try { - futures.remove(0).get(5, TimeUnit.SECONDS); + futures.remove(0).get(); Assert.fail("Should have been cancelled"); } catch (CancellationException ee) { // correct behaviour diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index e9c7817..4e526cc 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1315,7 +1315,7 @@ public class ConsumerImpl extends ConsumerBase { return booleanFuture; } - private CompletableFuture<MessageId> getLastMessageIdAsync() { + CompletableFuture<MessageId> getLastMessageIdAsync() { if (getState() == State.Closing || getState() == State.Closed) { return FutureUtil .failedFuture(new PulsarClientException.AlreadyClosedException("Consumer was already closed")); -- To stop receiving notification emails like this one, please contact mme...@apache.org.