315157973 commented on a change in pull request #11333:
URL: https://github.com/apache/pulsar/pull/11333#discussion_r670967460



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
##########
@@ -140,11 +143,21 @@ public PersistentSubscription(PersistentTopic topic, 
String subscriptionName, Ma
         this.fullName = MoreObjects.toStringHelper(this).add("topic", 
topicName).add("name", subName).toString();
         this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, 
subscriptionName, cursor, this);
         this.setReplicated(replicated);
+        TopicName topicName1 = TopicName.get(getTopicName());
+        Optional<Policies> policies = null;
+        try {
+            policies = 
topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
+                     .get(AdminResource.path(POLICIES, 
topicName1.getNamespaceObject().toString()));
+        } catch (Exception e) {
+            e.printStackTrace();

Review comment:
       Same as above

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -343,7 +351,16 @@ public PersistentTopic(String topic, ManagedLedger ledger, 
BrokerService brokerS
         this.backloggedCursorThresholdEntries =
                 
brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
         this.transactionCompletableFuture = new CompletableFuture<>();
-        if 
(brokerService.pulsar().getConfiguration().isTransactionCoordinatorEnabled()) {
+        TopicName topicName = TopicName.get(topic);
+        Optional<Policies> policies = null;
+        try {
+            policies = 
brokerService.pulsar().getConfigurationCache().policiesCache()
+                    .get(AdminResource.path(POLICIES, 
topicName.getNamespaceObject().toString()));
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        if 
(brokerService.pulsar().getConfiguration().isTransactionCoordinatorEnabled()
+                && policies.get().transaction_enable) {
             this.transactionBuffer = brokerService.getPulsar()

Review comment:
       If we do not create a transactionBuffer, will the client still send data 
to the TC?

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
##########
@@ -394,8 +416,16 @@ public void acknowledgeMessage(List<Position> positions, 
AckType ackType, Map<St
                 }
             }
         }
-
+        TopicName topicName1 = TopicName.get(getTopicName());
+        Optional<Policies> policies = null;
+        try {
+            policies = 
topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
+                    .get(AdminResource.path(POLICIES, 
topicName1.getNamespaceObject().toString()));
+        } catch (Exception e) {
+            e.printStackTrace();
+        }

Review comment:
       Should we abstract it as a method?

##########
File path: 
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
##########
@@ -3616,6 +3616,66 @@ public void clearProperties(String namespace) throws 
PulsarAdminException {
         return asyncDeleteRequest(path);
     }
 
+    @Override
+    public void setTransactionEnable(String namespace, boolean 
transactionEnable) throws PulsarAdminException {
+        try {
+            setTransactionEnableAsync(namespace, transactionEnable)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> setTransactionEnableAsync(String namespace, 
boolean transactionEnable) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "transactionEnabled");
+        return asyncPostRequest(path, Entity.entity(transactionEnable, 
MediaType.APPLICATION_JSON));
+    }
+
+    @Override
+    public boolean getTransactionEnabled(String namespace) throws 
PulsarAdminException {
+        try {
+            return 
getTransactionEnabledAsync(namespace).get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Boolean> getTransactionEnabledAsync(String 
namespace) {

Review comment:
       We also need to add CMD

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
##########
@@ -482,9 +486,18 @@ private void 
checkCanRemovePendingAcksAndHandle(PositionImpl position, MessageId
     }
 
     private boolean isTransactionEnabled() {
+        TopicName topicName = TopicName.get(subscription.getTopicName());
+        Optional<Policies> policies = null;
+        try {
+            policies = 
cnx.getBrokerService().pulsar().getConfigurationCache().policiesCache()
+                    .get(AdminResource.path(POLICIES, 
topicName.getNamespaceObject().toString()));
+        } catch (Exception e) {
+            e.printStackTrace();

Review comment:
       Please log error

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
##########
@@ -482,9 +486,18 @@ private void 
checkCanRemovePendingAcksAndHandle(PositionImpl position, MessageId
     }
 
     private boolean isTransactionEnabled() {
+        TopicName topicName = TopicName.get(subscription.getTopicName());
+        Optional<Policies> policies = null;
+        try {
+            policies = 
cnx.getBrokerService().pulsar().getConfigurationCache().policiesCache()
+                    .get(AdminResource.path(POLICIES, 
topicName.getNamespaceObject().toString()));
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
         return subscription instanceof PersistentSubscription
                 && ((PersistentTopic) subscription.getTopic())
-                
.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled();
+                
.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()
+                && policies.get().transaction_enable;

Review comment:
       There may be NPE, because policies can be null




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to