eolivelli commented on a change in pull request #11333:
URL: https://github.com/apache/pulsar/pull/11333#discussion_r671075036



##########
File path: conf/standalone.conf
##########
@@ -590,7 +590,7 @@ managedLedgerDefaultAckQuorum=1
 
 # How frequently to flush the cursor positions that were accumulated due to 
rate limiting. (seconds).
 # Default is 60 seconds
-managedLedgerCursorPositionFlushSeconds = 60
+managedLedgerCursorPositionFlushSeconds=60

Review comment:
       this change looks unrelated, can you please revert?

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
##########
@@ -1853,6 +1853,32 @@ public void 
removeNamespaceResourceGroup(@PathParam("tenant") String tenant,
         validateNamespaceName(tenant, namespace);
         internalSetNamespaceResourceGroup(null);
     }
+    @POST
+    @Path("/{tenant}/{namespace}/transactionEnabled")
+    @ApiOperation(value = "Update boolean of whether allow transaction of  
namespace")

Review comment:
       "Enable or disable transactions on the given namespace"

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
##########
@@ -1853,6 +1853,32 @@ public void 
removeNamespaceResourceGroup(@PathParam("tenant") String tenant,
         validateNamespaceName(tenant, namespace);
         internalSetNamespaceResourceGroup(null);
     }
+    @POST
+    @Path("/{tenant}/{namespace}/transactionEnabled")
+    @ApiOperation(value = "Update boolean of whether allow transaction of  
namespace")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Namespace doesn't exist"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void setTransactionEnable(
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @ApiParam(value = "boolean of whether allow transaction of  
namespace", required = true)
+                    boolean transactionEnable) {
+        validateNamespaceName(tenant, namespace);
+        internalSetTransactionEnabled(transactionEnable);
+    }
+    @GET
+    @Path("/{tenant}/{namespace}/transactionEnabled")
+    @ApiOperation(value = "The boolean of whether allow transaction of  
namespace")

Review comment:
       "Return information about the activation of transactions on the given 
namespace"

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
##########
@@ -140,11 +143,21 @@ public PersistentSubscription(PersistentTopic topic, 
String subscriptionName, Ma
         this.fullName = MoreObjects.toStringHelper(this).add("topic", 
topicName).add("name", subName).toString();
         this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, 
subscriptionName, cursor, this);
         this.setReplicated(replicated);
+        TopicName topicName1 = TopicName.get(getTopicName());
+        Optional<Policies> policies = null;
+        try {
+            policies = 
topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
+                     .get(AdminResource.path(POLICIES, 
topicName1.getNamespaceObject().toString()));
+        } catch (Exception e) {
+            e.printStackTrace();

Review comment:
       we must throw an error here, because we cannot proceed if we do not have 
this piece of information

##########
File path: 
pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
##########
@@ -125,6 +125,8 @@
     @SuppressWarnings("checkstyle:MemberName")
     public String resource_group_name = null;
 
+    @SuppressWarnings("checkstyle:MemberName")
+    public boolean transaction_enable = false;

Review comment:
       This should be "true" by default, otherwise when you upgrade from Pulsar 
2.8.0 to Pulsar 2.9.0 you will see Transactions disabled for every namespace.
   

##########
File path: 
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
##########
@@ -3616,6 +3616,66 @@ public void clearProperties(String namespace) throws 
PulsarAdminException {
         return asyncDeleteRequest(path);
     }
 
+    @Override
+    public void setTransactionEnable(String namespace, boolean 
transactionEnable) throws PulsarAdminException {
+        try {
+            setTransactionEnableAsync(namespace, transactionEnable)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> setTransactionEnableAsync(String namespace, 
boolean transactionEnable) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "transactionEnabled");
+        return asyncPostRequest(path, Entity.entity(transactionEnable, 
MediaType.APPLICATION_JSON));
+    }
+
+    @Override
+    public boolean getTransactionEnabled(String namespace) throws 
PulsarAdminException {
+        try {
+            return 
getTransactionEnabledAsync(namespace).get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Boolean> getTransactionEnabledAsync(String 
namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "transactionEnabled");
+        final CompletableFuture<Boolean> future = new CompletableFuture<>();
+        asyncGetRequest(path,

Review comment:
       who does this work during an upgrade?
   if I make the request to a broker that is still at 2.8.0 version ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to