petrov-mg commented on code in PR #11003:
URL: https://github.com/apache/ignite/pull/11003#discussion_r1387177190


##########
modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/service/ClientServiceMappingsResponse.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.service;
+
+import java.util.Collection;
+import java.util.UUID;
+import org.apache.ignite.internal.binary.BinaryRawWriterEx;
+import org.apache.ignite.internal.client.thin.ClientUtils;
+import 
org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+import org.jetbrains.annotations.Nullable;
+
+/** Service topology response. */
+public class ClientServiceMappingsResponse extends ClientResponse {
+    /** Service instance nodes. */
+    private final @Nullable Collection<UUID> svcsNodes;

Review Comment:
   `@Nullable` is missordered.



##########
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientServicesImpl.java:
##########
@@ -215,14 +316,33 @@ private ServiceInvocationHandler(
 
                 return ch.service(ClientOperation.SERVICE_INVOKE,
                     req -> writeServiceInvokeRequest(req, nodeIds, method, 
args),
-                    res -> utils.readObject(res.in(), false, 
method.getReturnType())
+                    res -> utils.readObject(res.in(), false, 
method.getReturnType()),
+                    serviceTopology()
                 );
             }
             catch (ClientError e) {
                 throw new ClientException(e);
             }
         }
 
+        /**
+         * If the partition awareness is enabled, notifies the service 
topology update and provides last knows service

Review Comment:
   `knows` - typo.



##########
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientServicesImpl.java:
##########
@@ -165,7 +185,88 @@ private ClientServiceDescriptorImpl 
readServiceDescriptor(BinaryReaderExImpl rea
     ClientServices withClusterGroup(ClientClusterGroupImpl grp) {
         A.notNull(grp, "grp");
 
-        return new ClientServicesImpl(ch, marsh, grp);
+        return new ClientServicesImpl(ch, marsh, grp, log);
+    }
+
+    /**
+     * Keeps topology of certain service and its update progress meta.
+     */
+    private final class ServiceTopology {
+        /** The service name. */
+        private final String srvcName;
+
+        /** If {@code true}, topology update of current service is in 
progress. */
+        private final AtomicBoolean updateInProgress = new AtomicBoolean();
+
+        /** Time of the last received topology. */
+        private volatile long lastUpdateRequestTime;
+
+        /** UUID of the nodes with at least one service instance. */
+        private volatile List<UUID> nodes = Collections.emptyList();
+
+        /** Last cluster topology version when current service topology was 
actual. */
+        private volatile AffinityTopologyVersion lastAffTop;
+
+        /** */
+        private ServiceTopology(String name) {
+            srvcName = name;
+        }
+
+        /**
+         * @return {@code True} if update of the service topology is required. 
{@code False} otherwise.
+         */
+        private boolean isUpdateRequired(AffinityTopologyVersion curAffTop) {
+            if (!ch.applyOnDefaultChannel(c -> 
c.protocolCtx().isFeatureSupported(ProtocolBitmaskFeature.SERVICE_TOPOLOGY), 
null))
+                return false;
+
+            return (lastAffTop == null || curAffTop.topologyVersion() > 
lastAffTop.topologyVersion()
+                || U.nanosToMillis(System.nanoTime() - lastUpdateRequestTime) 
>= SRV_TOP_UPDATE_PERIOD)
+                && updateInProgress.compareAndSet(false, true);
+        }
+
+        /**
+         * Asynchronously requests the service topology if the partition 
awareness is enabled and if the update is requred.
+         */
+        private void tryUpdate() {
+            AffinityTopologyVersion curAffTop = 
ch.affinityContext().lastTopology().version();
+
+            if (!isUpdateRequired(curAffTop))
+                return;
+
+            ch.serviceAsync(
+                ClientOperation.SERVICE_GET_TOPOLOGY,
+                req -> utils.writeObject(req.out(), srvcName),
+                resp -> {
+                    int cnt = resp.in().readInt();
+
+                    List<UUID> res = new ArrayList<>(cnt);
+
+                    for (int i = 0; i < cnt; ++i)
+                        res.add(new UUID(resp.in().readLong(), 
resp.in().readLong()));
+
+                    return res;
+                }).whenComplete((nodes, t) -> {
+                    if (t == null) {
+                        set(nodes, curAffTop);
+
+                        if (log.isDebugEnabled()) {
+                            log.debug("Topology of service '" + srvcName + "' 
has been updated. The " +
+                                "service instance nodes: " + nodes);
+                        }
+                    }
+                    else
+                        log.error("Failed to update topology of the service '" 
+ srvcName + "'.", t);
+
+                    updateInProgress.set(false);
+                });
+        }
+
+        /** Stores last known service topology. */
+        private void set(List<UUID> nodes, AffinityTopologyVersion lastAffTop) 
{

Review Comment:
   I don't think it's worth creating a separate method just to assign values 
​​to a couple of local variables. It is up to you though.



##########
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientServicesImpl.java:
##########
@@ -165,7 +185,88 @@ private ClientServiceDescriptorImpl 
readServiceDescriptor(BinaryReaderExImpl rea
     ClientServices withClusterGroup(ClientClusterGroupImpl grp) {
         A.notNull(grp, "grp");
 
-        return new ClientServicesImpl(ch, marsh, grp);
+        return new ClientServicesImpl(ch, marsh, grp, log);
+    }
+
+    /**
+     * Keeps topology of certain service and its update progress meta.
+     */
+    private final class ServiceTopology {
+        /** The service name. */
+        private final String srvcName;
+
+        /** If {@code true}, topology update of current service is in 
progress. */
+        private final AtomicBoolean updateInProgress = new AtomicBoolean();
+
+        /** Time of the last received topology. */
+        private volatile long lastUpdateRequestTime;
+
+        /** UUID of the nodes with at least one service instance. */
+        private volatile List<UUID> nodes = Collections.emptyList();
+
+        /** Last cluster topology version when current service topology was 
actual. */
+        private volatile AffinityTopologyVersion lastAffTop;
+
+        /** */
+        private ServiceTopology(String name) {
+            srvcName = name;
+        }
+
+        /**
+         * @return {@code True} if update of the service topology is required. 
{@code False} otherwise.
+         */
+        private boolean isUpdateRequired(AffinityTopologyVersion curAffTop) {
+            if (!ch.applyOnDefaultChannel(c -> 
c.protocolCtx().isFeatureSupported(ProtocolBitmaskFeature.SERVICE_TOPOLOGY), 
null))

Review Comment:
   I would try to split these checks somehow. Currently this method definitely 
does more than answering the question of whether a topology update is required.
   
   
   I also suggest not splitting `updateInProgress` update into two different 
methods.



##########
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientServicesImpl.java:
##########
@@ -165,7 +185,88 @@ private ClientServiceDescriptorImpl 
readServiceDescriptor(BinaryReaderExImpl rea
     ClientServices withClusterGroup(ClientClusterGroupImpl grp) {
         A.notNull(grp, "grp");
 
-        return new ClientServicesImpl(ch, marsh, grp);
+        return new ClientServicesImpl(ch, marsh, grp, log);
+    }
+
+    /**
+     * Keeps topology of certain service and its update progress meta.
+     */
+    private final class ServiceTopology {
+        /** The service name. */
+        private final String srvcName;
+
+        /** If {@code true}, topology update of current service is in 
progress. */
+        private final AtomicBoolean updateInProgress = new AtomicBoolean();
+
+        /** Time of the last received topology. */
+        private volatile long lastUpdateRequestTime;
+
+        /** UUID of the nodes with at least one service instance. */
+        private volatile List<UUID> nodes = Collections.emptyList();
+
+        /** Last cluster topology version when current service topology was 
actual. */
+        private volatile AffinityTopologyVersion lastAffTop;
+
+        /** */
+        private ServiceTopology(String name) {
+            srvcName = name;
+        }
+
+        /**
+         * @return {@code True} if update of the service topology is required. 
{@code False} otherwise.
+         */
+        private boolean isUpdateRequired(AffinityTopologyVersion curAffTop) {
+            if (!ch.applyOnDefaultChannel(c -> 
c.protocolCtx().isFeatureSupported(ProtocolBitmaskFeature.SERVICE_TOPOLOGY), 
null))
+                return false;
+
+            return (lastAffTop == null || curAffTop.topologyVersion() > 
lastAffTop.topologyVersion()
+                || U.nanosToMillis(System.nanoTime() - lastUpdateRequestTime) 
>= SRV_TOP_UPDATE_PERIOD)
+                && updateInProgress.compareAndSet(false, true);
+        }
+
+        /**
+         * Asynchronously requests the service topology if the partition 
awareness is enabled and if the update is requred.
+         */
+        private void tryUpdate() {
+            AffinityTopologyVersion curAffTop = 
ch.affinityContext().lastTopology().version();
+
+            if (!isUpdateRequired(curAffTop))
+                return;
+
+            ch.serviceAsync(
+                ClientOperation.SERVICE_GET_TOPOLOGY,
+                req -> utils.writeObject(req.out(), srvcName),
+                resp -> {
+                    int cnt = resp.in().readInt();
+
+                    List<UUID> res = new ArrayList<>(cnt);
+
+                    for (int i = 0; i < cnt; ++i)
+                        res.add(new UUID(resp.in().readLong(), 
resp.in().readLong()));
+
+                    return res;
+                }).whenComplete((nodes, t) -> {
+                    if (t == null) {
+                        set(nodes, curAffTop);
+
+                        if (log.isDebugEnabled()) {
+                            log.debug("Topology of service '" + srvcName + "' 
has been updated. The " +
+                                "service instance nodes: " + nodes);
+                        }
+                    }
+                    else
+                        log.error("Failed to update topology of the service '" 
+ srvcName + "'.", t);
+
+                    updateInProgress.set(false);
+                });
+        }
+
+        /** Stores last known service topology. */
+        private void set(List<UUID> nodes, AffinityTopologyVersion lastAffTop) 
{

Review Comment:
   I don't think it's worth creating a separate method just to assign values 
​​to a couple of local variables. It is up to you though.



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java:
##########
@@ -936,7 +936,7 @@ public IgniteInternalFuture<?> cancelAll(@NotNull 
Collection<String> servicesNam
      * @return Service topology.
      * @throws IgniteCheckedException On error.
      */
-    public Map<UUID, Integer> serviceTopology(String name, long timeout) 
throws IgniteCheckedException {
+    public @Nullable Map<UUID, Integer> serviceTopology(String name, long 
timeout) throws IgniteCheckedException {

Review Comment:
   `@Nullabe` is misordered.



##########
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientServicesImpl.java:
##########
@@ -215,14 +316,33 @@ private ServiceInvocationHandler(
 
                 return ch.service(ClientOperation.SERVICE_INVOKE,
                     req -> writeServiceInvokeRequest(req, nodeIds, method, 
args),
-                    res -> utils.readObject(res.in(), false, 
method.getReturnType())
+                    res -> utils.readObject(res.in(), false, 
method.getReturnType()),
+                    serviceTopology()
                 );
             }
             catch (ClientError e) {
                 throw new ClientException(e);
             }
         }
 
+        /**
+         * If the partition awareness is enabled, notifies the service 
topology update and provides last knows service
+         * nodes.
+         *
+         * @return Last known topology nodes.
+         */
+        private List<UUID> serviceTopology() {
+            if (ch.partitionAwarenessEnabled) {
+                ServiceTopology srvcTop = 
servicesTopologies.computeIfAbsent(name, ServiceTopology::new);
+
+                srvcTop.tryUpdate();

Review Comment:
   IMHO, it will be a bit clearer if ServiceTopology only provides something 
like a `getTopologyNodes` method and all the update logic is encapsulated 
inside ServiceTopology.



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/service/ClientServiceTopologyRequest.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.service;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.binary.BinaryRawReader;
+import 
org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
+import org.apache.ignite.internal.processors.platform.client.ClientRequest;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+import org.apache.ignite.internal.processors.platform.client.ClientStatus;
+import 
org.apache.ignite.internal.processors.platform.client.IgniteClientException;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ * Request topology of certain service.
+ */
+public class ClientServiceTopologyRequest extends ClientRequest {
+    /** The service name. */
+    private final String name;
+
+    /**
+     * Ctor.

Review Comment:
   Let's change this to something meaningful.



##########
modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java:
##########
@@ -159,7 +159,7 @@ private TcpIgniteClient(ClientConfiguration cfg) throws 
ClientException {
 
             compute = new ClientComputeImpl(ch, marsh, 
cluster.defaultClusterGroup());
 
-            services = new ClientServicesImpl(ch, marsh, 
cluster.defaultClusterGroup());
+            services = new ClientServicesImpl(ch, marsh, 
cluster.defaultClusterGroup(), NullLogger.whenNull(cfg.getLogger()));

Review Comment:
   It seems we can create a special variable for the logger in 
`TcpIgniteClient`.



##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java:
##########
@@ -30,8 +30,10 @@
 import java.util.stream.Stream;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.Ignite;

Review Comment:
   Seems like orphaned changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to