This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 16cf199ffdd [improve][broker] Add createTopicIfDoesNotExist option to RawReader constructor (#22264) 16cf199ffdd is described below commit 16cf199ffdddf32f47cb3cd92934e45723cee987 Author: Cong Zhao <zhaoc...@apache.org> AuthorDate: Thu Mar 14 16:45:15 2024 +0800 [improve][broker] Add createTopicIfDoesNotExist option to RawReader constructor (#22264) --- .../org/apache/pulsar/client/api/RawReader.java | 8 +++++++- .../apache/pulsar/client/impl/RawReaderImpl.java | 10 +++++----- .../org/apache/pulsar/compaction/Compactor.java | 2 +- .../apache/pulsar/client/impl/RawReaderTest.java | 23 ++++++++++++++++++++++ 4 files changed, 36 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java index 92a2c89f9bc..b7805c36b3b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java @@ -32,8 +32,14 @@ public interface RawReader { */ static CompletableFuture<RawReader> create(PulsarClient client, String topic, String subscription) { + return create(client, topic, subscription, true); + } + + static CompletableFuture<RawReader> create(PulsarClient client, String topic, String subscription, + boolean createTopicIfDoesNotExist) { CompletableFuture<Consumer<byte[]>> future = new CompletableFuture<>(); - RawReader r = new RawReaderImpl((PulsarClientImpl) client, topic, subscription, future); + RawReader r = + new RawReaderImpl((PulsarClientImpl) client, topic, subscription, future, createTopicIfDoesNotExist); return future.thenApply(__ -> r); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java index f6523241399..3d7ad9f5865 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java @@ -51,7 +51,8 @@ public class RawReaderImpl implements RawReader { private RawConsumerImpl consumer; public RawReaderImpl(PulsarClientImpl client, String topic, String subscription, - CompletableFuture<Consumer<byte[]>> consumerFuture) { + CompletableFuture<Consumer<byte[]>> consumerFuture, + boolean createTopicIfDoesNotExist) { consumerConfiguration = new ConsumerConfigurationData<>(); consumerConfiguration.getTopicNames().add(topic); consumerConfiguration.setSubscriptionName(subscription); @@ -61,8 +62,7 @@ public class RawReaderImpl implements RawReader { consumerConfiguration.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest); consumerConfiguration.setAckReceiptEnabled(true); - consumer = new RawConsumerImpl(client, consumerConfiguration, - consumerFuture); + consumer = new RawConsumerImpl(client, consumerConfiguration, consumerFuture, createTopicIfDoesNotExist); } @Override @@ -111,7 +111,7 @@ public class RawReaderImpl implements RawReader { final Queue<CompletableFuture<RawMessage>> pendingRawReceives; RawConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<byte[]> conf, - CompletableFuture<Consumer<byte[]>> consumerFuture) { + CompletableFuture<Consumer<byte[]>> consumerFuture, boolean createTopicIfDoesNotExist) { super(client, conf.getSingleTopic(), conf, @@ -123,7 +123,7 @@ public class RawReaderImpl implements RawReader { MessageId.earliest, 0 /* startMessageRollbackDurationInSec */, Schema.BYTES, null, - false + createTopicIfDoesNotExist ); incomingRawMessages = new GrowableArrayBlockingQueue<>(); pendingRawReceives = new ConcurrentLinkedQueue<>(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java index e93a642c76e..983443432ff 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java @@ -56,7 +56,7 @@ public abstract class Compactor { } public CompletableFuture<Long> compact(String topic) { - return RawReader.create(pulsar, topic, COMPACTION_SUBSCRIPTION).thenComposeAsync( + return RawReader.create(pulsar, topic, COMPACTION_SUBSCRIPTION, false).thenComposeAsync( this::compactAndCloseReader, scheduler); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java index de011ea490c..ab8c35c440e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -36,14 +37,17 @@ import org.apache.commons.lang3.tuple.ImmutableTriple; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.RawMessage; import org.apache.pulsar.client.api.RawReader; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.protocol.Commands; import org.awaitility.Awaitility; import org.testng.Assert; @@ -461,4 +465,23 @@ public class RawReaderTest extends MockedPulsarServiceBaseTest { } } } + + @Test + public void testAutoCreateTopic() throws ExecutionException, InterruptedException, PulsarAdminException { + String topic = "persistent://my-property/my-ns/" + BrokerTestUtil.newUniqueName("reader"); + + RawReader reader = RawReader.create(pulsarClient, topic, subscription).get(); + TopicStats stats = admin.topics().getStats(topic); + Assert.assertNotNull(stats); + reader.closeAsync().join(); + + String topic2 = "persistent://my-property/my-ns/" + BrokerTestUtil.newUniqueName("reader"); + try { + reader = RawReader.create(pulsarClient, topic2, subscription, false).get(); + Assert.fail(); + } catch (Exception e) { + Assert.assertTrue(e.getCause() instanceof PulsarClientException.TopicDoesNotExistException); + } + reader.closeAsync().join(); + } }