RobertIndie commented on code in PR #23686:
URL: https://github.com/apache/pulsar/pull/23686#discussion_r1875254225


##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java:
##########
@@ -321,22 +325,25 @@ public void accept(Notification t) {
         }
     }
 
-    private CompletableFuture<T> 
executeWithRetry(Supplier<CompletableFuture<T>> op, String key) {
-        CompletableFuture<T> result = new CompletableFuture<>();
+    private void execute(Supplier<CompletableFuture<T>> op, String key, 
CompletableFuture<T> result, Backoff backoff) {
         op.get().thenAccept(result::complete).exceptionally((ex) -> {
             if (ex.getCause() instanceof BadVersionException) {
                 // if resource is updated by other than metadata-cache then 
metadata-cache will get bad-version
                 // exception. so, try to invalidate the cache and try one more 
time.
                 objCache.synchronous().invalidate(key);
-                op.get().thenAccept(result::complete).exceptionally((ex1) -> {
-                    result.completeExceptionally(ex1.getCause());
-                    return null;
-                });
+                backoffExecutor.schedule(() -> execute(op, key, result, 
backoff), backoff.next(),
+                        TimeUnit.MILLISECONDS);
                 return null;
             }
             result.completeExceptionally(ex.getCause());
             return null;
         });
+    }
+
+    private CompletableFuture<T> 
executeWithRetry(Supplier<CompletableFuture<T>> op, String key) {
+        final var backoff = new Backoff(100, TimeUnit.MILLISECONDS, 1, 
TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);

Review Comment:
   Now the problem becomes whether we should throw the 
ConcurrentModificationException/BadVersionException. From a design perspective 
and according to [the readModifyUpdate 
documentation](https://github.com/apache/pulsar/blob/216b83008deb469e0fc55ed8117f0c393ebcb0ac/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java#L108),
 it seems that MetadataCacheImpl should be responsible for retrying and should 
not throw this exception.
   
   Since readModifyUpdate does not require users to provide a version, it does 
not seem reasonable to throw a 
BadVersionException/ConcurrentModificationException to the caller. If the 
caller needs to ensure that data is not accidentally overwritten, throwing a 
BadVersionException/ConcurrentModificationException will not solve this 
problem. Therefore, unless we can explicitly pass in a version, we cannot 
achieve this semantic.
   
   Besides, I found that introducing unlimited retries in MetadataCache is also 
not reasonable. We can use `metadataStoreOperationTimeoutSeconds` as the 
mandatory stop time for retries and throw the TimeoutException if all retires 
are failed. Also we can reduce the maximum timeout time following 
@BewareMyPower 's 
[suggestion](https://github.com/streamnative/ursa-storage/pull/548#discussion_r1873286488).
 WDYT? @codelipenghui @BewareMyPower 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to