petrov-mg commented on code in PR #11003:
URL: https://github.com/apache/ignite/pull/11003#discussion_r1387920888


##########
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientServicesImpl.java:
##########
@@ -165,7 +185,84 @@ private ClientServiceDescriptorImpl 
readServiceDescriptor(BinaryReaderExImpl rea
     ClientServices withClusterGroup(ClientClusterGroupImpl grp) {
         A.notNull(grp, "grp");
 
-        return new ClientServicesImpl(ch, marsh, grp);
+        return new ClientServicesImpl(ch, marsh, grp, log);
+    }
+
+    /**
+     * Keeps topology of certain service and its update progress meta.
+     */
+    private final class ServiceTopology {
+        /** The service name. */
+        private final String srvcName;
+
+        /** If {@code true}, topology update of current service is in 
progress. */
+        private final AtomicBoolean updateInProgress = new AtomicBoolean();
+
+        /** Time of the last received topology. */
+        private volatile long lastUpdateRequestTime;
+
+        /** UUID of the nodes with at least one service instance. */
+        private volatile List<UUID> nodes = Collections.emptyList();
+
+        /** Last cluster topology version when current service topology was 
actual. */
+        private volatile AffinityTopologyVersion lastAffTop;
+
+        /** */
+        private ServiceTopology(String name) {
+            srvcName = name;
+        }
+
+        /**
+         * @return {@code True} if update of the service topology is required. 
{@code False} otherwise.
+         */
+        private boolean isUpdateRequired(AffinityTopologyVersion curAffTop) {
+            return (lastAffTop == null || curAffTop.topologyVersion() > 
lastAffTop.topologyVersion()
+                || U.nanosToMillis(System.nanoTime() - lastUpdateRequestTime) 
>= SRV_TOP_UPDATE_PERIOD)
+                && updateInProgress.compareAndSet(false, true);
+        }
+
+        /**
+         * Provides current known service topology and asynchronously 
initiates service topology update if requred.
+         *
+         * @return Current known service topology.
+         */
+        private List<UUID> getAndUpdate() {
+            AffinityTopologyVersion curAffTop = 
ch.affinityContext().lastTopology().version();
+
+            if (!isUpdateRequired(curAffTop))
+                return nodes;
+
+            ch.serviceAsync(
+                ClientOperation.SERVICE_GET_TOPOLOGY,
+                req -> utils.writeObject(req.out(), srvcName),
+                resp -> {
+                    int cnt = resp.in().readInt();
+
+                    List<UUID> res = new ArrayList<>(cnt);
+
+                    for (int i = 0; i < cnt; ++i)
+                        res.add(new UUID(resp.in().readLong(), 
resp.in().readLong()));
+
+                    return res;
+                }).whenComplete((nodes, t) -> {

Review Comment:
   Let's rename `t` to `err` or something like that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to