dsmiley commented on code in PR #4176:
URL: https://github.com/apache/solr/pull/4176#discussion_r2870231950


##########
solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java:
##########
@@ -105,7 +105,7 @@ public abstract class CloudSolrClient extends SolrClient {
   private final boolean directUpdatesToLeadersOnly;
   private final RequestReplicaListTransformerGenerator requestRLTGenerator;
   private final boolean parallelUpdates;
-  private ExecutorService threadPool =
+  private final ExecutorService threadPool =

Review Comment:
   not critical but it's needless that this was non-final.



##########
solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java:
##########
@@ -1658,41 +1657,34 @@ protected DocCollection getDocCollection(String 
collection, Integer expectedVers
   }
 
   private CompletableFuture<DocCollection> triggerCollectionRefresh(String 
collection) {
-    if (closed) {
-      ExpiringCachedDocCollection cacheEntry = 
collectionStateCache.peek(collection);
-      DocCollection cached = cacheEntry == null ? null : cacheEntry.cached;
-      return CompletableFuture.completedFuture(cached);
-    }
-    return collectionRefreshes.computeIfAbsent(
+    return collectionRefreshes.compute(
         collection,
-        key -> {
-          ExecutorService executor = threadPool;
-          CompletableFuture<DocCollection> future;
-          if (executor == null || ExecutorUtil.isShutdown(executor)) {
-            future = new CompletableFuture<>();
-            try {
-              future.complete(loadDocCollection(key));
-            } catch (Throwable t) {
-              future.completeExceptionally(t);
-            }
+        (key, existingFuture) -> {
+          // A refresh is still in progress; return it.
+          if (existingFuture != null && !existingFuture.isDone()) {
+            return existingFuture;
+          }

Review Comment:
   This is the essence of the fix.  Everything else in the PR is an improvement 
but non-critical.



##########
solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java:
##########
@@ -1658,41 +1657,34 @@ protected DocCollection getDocCollection(String 
collection, Integer expectedVers
   }
 
   private CompletableFuture<DocCollection> triggerCollectionRefresh(String 
collection) {
-    if (closed) {
-      ExpiringCachedDocCollection cacheEntry = 
collectionStateCache.peek(collection);
-      DocCollection cached = cacheEntry == null ? null : cacheEntry.cached;
-      return CompletableFuture.completedFuture(cached);
-    }
-    return collectionRefreshes.computeIfAbsent(
+    return collectionRefreshes.compute(
         collection,
-        key -> {
-          ExecutorService executor = threadPool;
-          CompletableFuture<DocCollection> future;
-          if (executor == null || ExecutorUtil.isShutdown(executor)) {
-            future = new CompletableFuture<>();
-            try {
-              future.complete(loadDocCollection(key));
-            } catch (Throwable t) {
-              future.completeExceptionally(t);
-            }
+        (key, existingFuture) -> {
+          // A refresh is still in progress; return it.
+          if (existingFuture != null && !existingFuture.isDone()) {
+            return existingFuture;
+          }
+          // No refresh is in-progress, so trigger it.
+
+          if (ExecutorUtil.isShutdown(threadPool)) {
+            assert closed; // see close() for the sequence
+            ExpiringCachedDocCollection cacheEntry = 
collectionStateCache.peek(key);
+            DocCollection cached = cacheEntry == null ? null : 
cacheEntry.cached;
+            return CompletableFuture.completedFuture(cached);
           } else {
-            future =
-                CompletableFuture.supplyAsync(
-                    () -> {
-                      stateRefreshSemaphore.acquireUninterruptibly();
-                      try {
-                        return loadDocCollection(key);
-                      } finally {
-                        stateRefreshSemaphore.release();
-                      }
-                    },
-                    executor);
+            return CompletableFuture.supplyAsync(
+                () -> {
+                  stateRefreshSemaphore.acquireUninterruptibly();
+                  try {
+                    return loadDocCollection(key);
+                  } finally {
+                    stateRefreshSemaphore.release();
+                    // Remove the entry in case of many collections
+                    collectionRefreshes.remove(key);
+                  }
+                },
+                threadPool);
           }
-          future.whenCompleteAsync(
-              (result, error) -> {
-                collectionRefreshes.remove(key, future);
-              });
-          return future;

Review Comment:
   It's much lighter weight & simpler to read to incorporate this into the 
single lambda callback to occur after the collection is loaded.  
   
   I spent a fair amount of time previously trying to assure myself on the 
nuances of whenComplete vs whenCompleteAsync, and on returning the result of 
this future vs not, or having the outer method actually do this.  Played with a 
debugger to inspect threads; putting sleep in places and running tests.  It was 
educational but I concluded we can do something much simpler.



##########
solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java:
##########
@@ -1658,41 +1657,34 @@ protected DocCollection getDocCollection(String 
collection, Integer expectedVers
   }
 
   private CompletableFuture<DocCollection> triggerCollectionRefresh(String 
collection) {
-    if (closed) {
-      ExpiringCachedDocCollection cacheEntry = 
collectionStateCache.peek(collection);
-      DocCollection cached = cacheEntry == null ? null : cacheEntry.cached;
-      return CompletableFuture.completedFuture(cached);
-    }
-    return collectionRefreshes.computeIfAbsent(
+    return collectionRefreshes.compute(
         collection,
-        key -> {
-          ExecutorService executor = threadPool;
-          CompletableFuture<DocCollection> future;
-          if (executor == null || ExecutorUtil.isShutdown(executor)) {
-            future = new CompletableFuture<>();
-            try {
-              future.complete(loadDocCollection(key));
-            } catch (Throwable t) {
-              future.completeExceptionally(t);
-            }
+        (key, existingFuture) -> {
+          // A refresh is still in progress; return it.
+          if (existingFuture != null && !existingFuture.isDone()) {
+            return existingFuture;
+          }
+          // No refresh is in-progress, so trigger it.
+
+          if (ExecutorUtil.isShutdown(threadPool)) {
+            assert closed; // see close() for the sequence
+            ExpiringCachedDocCollection cacheEntry = 
collectionStateCache.peek(key);
+            DocCollection cached = cacheEntry == null ? null : 
cacheEntry.cached;
+            return CompletableFuture.completedFuture(cached);
           } else {
-            future =
-                CompletableFuture.supplyAsync(
-                    () -> {
-                      stateRefreshSemaphore.acquireUninterruptibly();
-                      try {
-                        return loadDocCollection(key);
-                      } finally {
-                        stateRefreshSemaphore.release();
-                      }
-                    },
-                    executor);
+            return CompletableFuture.supplyAsync(
+                () -> {
+                  stateRefreshSemaphore.acquireUninterruptibly();
+                  try {
+                    return loadDocCollection(key);
+                  } finally {
+                    stateRefreshSemaphore.release();
+                    // Remove the entry in case of many collections
+                    collectionRefreshes.remove(key);

Review Comment:
   it should always remove the same future, by the way.



##########
solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java:
##########
@@ -1658,41 +1657,34 @@ protected DocCollection getDocCollection(String 
collection, Integer expectedVers
   }
 
   private CompletableFuture<DocCollection> triggerCollectionRefresh(String 
collection) {
-    if (closed) {
-      ExpiringCachedDocCollection cacheEntry = 
collectionStateCache.peek(collection);
-      DocCollection cached = cacheEntry == null ? null : cacheEntry.cached;
-      return CompletableFuture.completedFuture(cached);
-    }

Review Comment:
   I found it confusing for this method to have two code paths for the closed 
scenario.  So I centralized it to one spot within the map.compute call.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to