315157973 commented on a change in pull request #11333: URL: https://github.com/apache/pulsar/pull/11333#discussion_r670967460
########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java ########## @@ -140,11 +143,21 @@ public PersistentSubscription(PersistentTopic topic, String subscriptionName, Ma this.fullName = MoreObjects.toStringHelper(this).add("topic", topicName).add("name", subName).toString(); this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, subscriptionName, cursor, this); this.setReplicated(replicated); + TopicName topicName1 = TopicName.get(getTopicName()); + Optional<Policies> policies = null; + try { + policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache() + .get(AdminResource.path(POLICIES, topicName1.getNamespaceObject().toString())); + } catch (Exception e) { + e.printStackTrace(); Review comment: Same as above ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java ########## @@ -343,7 +351,16 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS this.backloggedCursorThresholdEntries = brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold(); this.transactionCompletableFuture = new CompletableFuture<>(); - if (brokerService.pulsar().getConfiguration().isTransactionCoordinatorEnabled()) { + TopicName topicName = TopicName.get(topic); + Optional<Policies> policies = null; + try { + policies = brokerService.pulsar().getConfigurationCache().policiesCache() + .get(AdminResource.path(POLICIES, topicName.getNamespaceObject().toString())); + } catch (Exception e) { + e.printStackTrace(); + } + if (brokerService.pulsar().getConfiguration().isTransactionCoordinatorEnabled() + && policies.get().transaction_enable) { this.transactionBuffer = brokerService.getPulsar() Review comment: If we do not create a transactionBuffer, will the client still send data to the TC? ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java ########## @@ -394,8 +416,16 @@ public void acknowledgeMessage(List<Position> positions, AckType ackType, Map<St } } } - + TopicName topicName1 = TopicName.get(getTopicName()); + Optional<Policies> policies = null; + try { + policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache() + .get(AdminResource.path(POLICIES, topicName1.getNamespaceObject().toString())); + } catch (Exception e) { + e.printStackTrace(); + } Review comment: Should we abstract it as a method? ########## File path: pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java ########## @@ -3616,6 +3616,66 @@ public void clearProperties(String namespace) throws PulsarAdminException { return asyncDeleteRequest(path); } + @Override + public void setTransactionEnable(String namespace, boolean transactionEnable) throws PulsarAdminException { + try { + setTransactionEnableAsync(namespace, transactionEnable) + .get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture<Void> setTransactionEnableAsync(String namespace, boolean transactionEnable) { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, "transactionEnabled"); + return asyncPostRequest(path, Entity.entity(transactionEnable, MediaType.APPLICATION_JSON)); + } + + @Override + public boolean getTransactionEnabled(String namespace) throws PulsarAdminException { + try { + return getTransactionEnabledAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture<Boolean> getTransactionEnabledAsync(String namespace) { Review comment: We also need to add CMD ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java ########## @@ -482,9 +486,18 @@ private void checkCanRemovePendingAcksAndHandle(PositionImpl position, MessageId } private boolean isTransactionEnabled() { + TopicName topicName = TopicName.get(subscription.getTopicName()); + Optional<Policies> policies = null; + try { + policies = cnx.getBrokerService().pulsar().getConfigurationCache().policiesCache() + .get(AdminResource.path(POLICIES, topicName.getNamespaceObject().toString())); + } catch (Exception e) { + e.printStackTrace(); Review comment: Please log error ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java ########## @@ -482,9 +486,18 @@ private void checkCanRemovePendingAcksAndHandle(PositionImpl position, MessageId } private boolean isTransactionEnabled() { + TopicName topicName = TopicName.get(subscription.getTopicName()); + Optional<Policies> policies = null; + try { + policies = cnx.getBrokerService().pulsar().getConfigurationCache().policiesCache() + .get(AdminResource.path(POLICIES, topicName.getNamespaceObject().toString())); + } catch (Exception e) { + e.printStackTrace(); + } return subscription instanceof PersistentSubscription && ((PersistentTopic) subscription.getTopic()) - .getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled(); + .getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled() + && policies.get().transaction_enable; Review comment: There may be NPE, because policies can be null -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org