This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.5 by this push:
     new 41d1d6e  [Issue 6394] Add configuration to disable auto creation of 
subscriptions (#6456)
41d1d6e is described below

commit 41d1d6ecb9b03c1cecadd9222b82d3f197e4c9b6
Author: Sijie Guo <[email protected]>
AuthorDate: Wed Mar 4 17:21:53 2020 -0800

    [Issue 6394] Add configuration to disable auto creation of subscriptions 
(#6456)
    
    ### Motivation
    
    Fixes #6394
    
    ### Modifications
    
    - provide a flag `allowAutoSubscriptionCreation` in `ServiceConfiguration`, 
defaults to `true`
    - when `allowAutoSubscriptionCreation` is disabled and the specified 
subscription (`Durable`) on the topic does not exist when trying to subscribe 
via a consumer, the server should reject the request directly by 
`handleSubscribe` in `ServerCnx`
    - create the subscription on the coordination topic if it does not exist 
when init `WorkerService`
    
    (cherry picked from commit c3292a611c9cbf2b17c96c5317d8f20247eb1f41)
---
 conf/broker.conf                                   |  3 ++
 conf/standalone.conf                               |  3 ++
 .../apache/pulsar/broker/ServiceConfiguration.java |  5 +++
 .../broker/service/BrokerServiceException.java     |  6 ++++
 .../apache/pulsar/broker/service/ServerCnx.java    | 13 +++++--
 .../BrokerServiceAutoTopicCreationTest.java        | 42 ++++++++++++++++++++++
 .../pulsar/functions/worker/WorkerService.java     |  5 +++
 site2/docs/reference-configuration.md              |  1 +
 8 files changed, 76 insertions(+), 2 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 62107cb..fc40a15 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -91,6 +91,9 @@ allowAutoTopicCreation=true
 # The type of topic that is allowed to be automatically 
created.(partitioned/non-partitioned)
 allowAutoTopicCreationType=non-partitioned
 
+# Enable subscription auto creation if new consumer connected (disable auto 
creation with value false)
+allowAutoSubscriptionCreation=true
+
 # The number of partitioned topics that is allowed to be automatically created 
if allowAutoTopicCreationType is partitioned.
 defaultNumPartitions=1
 
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 6619e90..3fa855c 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -728,6 +728,9 @@ allowAutoTopicCreation=true
 # The type of topic that is allowed to be automatically 
created.(partitioned/non-partitioned)
 allowAutoTopicCreationType=non-partitioned
 
+# Enable subscription auto creation if new consumer connected (disable auto 
creation with value false)
+allowAutoSubscriptionCreation=true
+
 # The number of partitioned topics that is allowed to be automatically created 
if allowAutoTopicCreationType is partitioned.
 defaultNumPartitions=1
 
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 68d56fe..c13089b 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1008,6 +1008,11 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     )
     private String allowAutoTopicCreationType = "non-partitioned";
     @FieldContext(
+        category = CATEGORY_STORAGE_ML,
+        doc = "Allow automated creation of subscriptions if set to true 
(default value)."
+    )
+    private boolean allowAutoSubscriptionCreation = true;
+    @FieldContext(
             category = CATEGORY_STORAGE_ML,
             doc = "The number of partitioned topics that is allowed to be 
automatically created"
                     + "if allowAutoTopicCreationType is partitioned."
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
index e794718..b4bfed5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
@@ -123,6 +123,12 @@ public class BrokerServiceException extends Exception {
         }
     }
 
+    public static class SubscriptionNotFoundException extends 
BrokerServiceException {
+        public SubscriptionNotFoundException(String msg) {
+            super(msg);
+        }
+    }
+
     public static class SubscriptionBusyException extends 
BrokerServiceException {
         public SubscriptionBusyException(String msg) {
             super(msg);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index c1a90be..39da53d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -63,6 +63,7 @@ import 
org.apache.pulsar.broker.authentication.AuthenticationState;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
+import 
org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionNotFoundException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicNotFoundException;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import 
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
@@ -740,7 +741,6 @@ public class ServerCnx extends PulsarHandler {
                 subscribe.getStartMessageId().getLedgerId(), 
subscribe.getStartMessageId().getEntryId(),
                 subscribe.getStartMessageId().getPartition(), 
subscribe.getStartMessageId().getBatchIndex())
                 : null;
-        final String subscription = subscribe.getSubscription();
         final int priorityLevel = subscribe.hasPriorityLevel() ? 
subscribe.getPriorityLevel() : 0;
         final boolean readCompacted = subscribe.getReadCompacted();
         final Map<String, String> metadata = 
CommandUtils.metadataFromCommand(subscribe);
@@ -766,7 +766,7 @@ public class ServerCnx extends PulsarHandler {
                 if (service.isAuthorizationEnabled()) {
                     authorizationFuture = 
service.getAuthorizationService().canConsumeAsync(topicName,
                             originalPrincipal != null ? originalPrincipal : 
authRole, authenticationData,
-                            subscription);
+                            subscriptionName);
                 } else {
                     authorizationFuture = 
CompletableFuture.completedFuture(true);
                 }
@@ -829,6 +829,15 @@ public class ServerCnx extends PulsarHandler {
 
                                     Topic topic = optTopic.get();
 
+                                    boolean rejectSubscriptionIfDoesNotExist = 
isDurable
+                                        && 
!service.pulsar().getConfig().isAllowAutoSubscriptionCreation()
+                                        && 
!topic.getSubscriptions().containsKey(subscriptionName);
+
+                                    if (rejectSubscriptionIfDoesNotExist) {
+                                        return FutureUtil
+                                            .failedFuture(new 
SubscriptionNotFoundException("Subscription does not exist"));
+                                    }
+
                                     if (schema != null) {
                                         return 
topic.addSchemaIfIdleOrCheckCompatible(schema)
                                             .thenCompose(v -> 
topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
index 275ec3f..3e906ef 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
@@ -141,6 +141,48 @@ public class BrokerServiceAutoTopicCreationTest extends 
BrokerTestBase{
         
assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString));
     }
 
+    @Test
+    public void testAutoSubscriptionCreationDisable() throws Exception{
+        pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false);
+
+        final String topicName = "persistent://prop/ns-abc/test-subtopic";
+        final String subscriptionName = "test-subtopic-sub";
+
+        admin.topics().createNonPartitionedTopic(topicName);
+
+        try {
+            
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
+            fail("Subscribe operation should have failed");
+        } catch (Exception e) {
+            assertTrue(e instanceof PulsarClientException);
+        }
+        
assertFalse(admin.topics().getSubscriptions(topicName).contains(subscriptionName));
+
+        // Reset to default
+        pulsar.getConfiguration().setAllowAutoSubscriptionCreation(true);
+    }
+
+    @Test
+    public void testSubscriptionCreationWithAutoCreationDisable() throws 
Exception{
+        pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false);
+
+        final String topicName = "persistent://prop/ns-abc/test-subtopic";
+        final String subscriptionName = "test-subtopic-sub";
+
+        admin.topics().createNonPartitionedTopic(topicName);
+        
assertFalse(admin.topics().getSubscriptions(topicName).contains(subscriptionName));
+
+        // Create the subscription by PulsarAdmin
+        admin.topics().createSubscription(topicName, subscriptionName, 
MessageId.earliest);
+        
assertTrue(admin.topics().getSubscriptions(topicName).contains(subscriptionName));
+
+        // Subscribe operation should be successful
+        
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
+
+        // Reset to default
+        pulsar.getConfiguration().setAllowAutoSubscriptionCreation(true);
+    }
+
     /**
      * CheckAllowAutoCreation's default value is false.
      * So using getPartitionedTopicMetadata() directly will not produce 
partitioned topic
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index fdfb7de..b23c707 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -34,6 +34,7 @@ import 
org.apache.distributedlog.api.namespace.NamespaceBuilder;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 
@@ -162,6 +163,10 @@ public class WorkerService {
             this.connectorsManager = new ConnectorsManager(workerConfig);
 
             //create membership manager
+            String coordinationTopic = 
workerConfig.getClusterCoordinationTopic();
+            if 
(!brokerAdmin.topics().getSubscriptions(coordinationTopic).contains(MembershipManager.COORDINATION_TOPIC_SUBSCRIPTION))
 {
+                brokerAdmin.topics().createSubscription(coordinationTopic, 
MembershipManager.COORDINATION_TOPIC_SUBSCRIPTION, MessageId.earliest);
+            }
             this.membershipManager = new MembershipManager(this, this.client, 
this.brokerAdmin);
 
             // create function runtime manager
diff --git a/site2/docs/reference-configuration.md 
b/site2/docs/reference-configuration.md
index 642a7c0..3d0f242 100644
--- a/site2/docs/reference-configuration.md
+++ b/site2/docs/reference-configuration.md
@@ -129,6 +129,7 @@ Pulsar brokers are responsible for handling incoming 
messages from producers, di
 |backlogQuotaDefaultLimitGB|  Default per-topic backlog quota limit |10|
 |allowAutoTopicCreation| Enable topic auto creation if a new producer or 
consumer connected |true|
 |allowAutoTopicCreationType| The topic type (partitioned or non-partitioned) 
that is allowed to be automatically created. |Partitioned|
+|allowAutoSubscriptionCreation| Enable subscription auto creation if a new 
consumer connected |true|
 |defaultNumPartitions| The number of partitioned topics that is allowed to be 
automatically created if `allowAutoTopicCreationType` is partitioned |1|
 |brokerDeleteInactiveTopicsEnabled| Enable the deletion of inactive topics  
|true|
 |brokerDeleteInactiveTopicsFrequencySeconds|  How often to check for inactive 
topics  |60|

Reply via email to