lhotari commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r973057855


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java:
##########
@@ -68,4 +69,28 @@ public static CompletableFuture<Void> 
waitForAll(List<CompletableFuture<Void>> f
 
         return compositeFuture;
     }
+
+    public static <T> CompletableFuture<T> 
executeWithRetry(Supplier<CompletableFuture<T>> op,
+                                                            Class<? extends 
Exception> needRetryExceptionClass) {
+        CompletableFuture<T> resultFuture = new CompletableFuture<>();
+        op.get().whenComplete((res, ex) -> {
+            if (ex == null) {
+                resultFuture.complete(res);
+            } else {
+                if (needRetryExceptionClass.isAssignableFrom(ex.getClass())) {
+                    executeWithRetry(op, 
needRetryExceptionClass).whenComplete((res2, ex2) -> {
+                        if (ex2 == null) {
+                            resultFuture.complete(res2);
+                        } else {
+                            resultFuture.completeExceptionally(ex2);
+                        }
+                    });
+                    return;
+                }
+                resultFuture.completeExceptionally(ex);
+            }
+        });
+
+        return resultFuture;
+    }

Review Comment:
   Retry without any backoff is not usually recommended. could we use the 
exponential backoff solution here?



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -324,24 +328,27 @@ public Map<String, String> getCursorProperties() {
         return cursorProperties;
     }
 
-    @Override
-    public CompletableFuture<Void> setCursorProperties(Map<String, String> 
cursorProperties) {
+    private CompletableFuture<Void> computeCursorProperties(
+            final Function<Map<String, String>, Map<String, String>> 
updateFunction) {
         CompletableFuture<Void> updateCursorPropertiesResult = new 
CompletableFuture<>();
-        ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new 
MetaStoreCallback<>() {
-            @Override
-            public void operationComplete(ManagedCursorInfo info, Stat stat) {
-                ManagedCursorInfo copy = ManagedCursorInfo
-                        .newBuilder(info)
-                        .clearCursorProperties()
-                        
.addAllCursorProperties(buildStringPropertiesMap(cursorProperties))
-                        .build();
-                ledger.getStore().asyncUpdateCursorInfo(ledger.getName(),
-                        name, copy, stat, new MetaStoreCallback<>() {
+
+        final Stat lastCursorLedgerStat = 
ManagedCursorImpl.this.cursorLedgerStat;
+
+        Map<String, String> newProperties = 
updateFunction.apply(ManagedCursorImpl.this.cursorProperties);
+        ManagedCursorInfo copy = ManagedCursorInfo
+                .newBuilder(ManagedCursorImpl.this.managedCursorInfo)
+                .clearCursorProperties()
+                
.addAllCursorProperties(buildStringPropertiesMap(newProperties))
+                .build();
+
+        ledger.getStore().asyncUpdateCursorInfo(ledger.getName(),
+                name, copy, lastCursorLedgerStat, new MetaStoreCallback<>() {
                     @Override
                     public void operationComplete(Void result, Stat stat) {
                         log.info("[{}] Updated ledger cursor: {} properties 
{}", ledger.getName(),
                                 name, cursorProperties);
-                        ManagedCursorImpl.this.cursorProperties = 
cursorProperties;
+                        ManagedCursorImpl.this.managedCursorInfo = copy;
+                        ManagedCursorImpl.this.cursorProperties = 
Collections.unmodifiableMap(newProperties);

Review Comment:
   `Collections.unmodifiableMap` alone won't make a map immutable if the map 
gets modified outside of the wrapper. is `newProperties` already a copy?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to