alievmirza commented on code in PR #1729:
URL: https://github.com/apache/ignite-3/pull/1729#discussion_r1134277628


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -209,17 +253,21 @@ public DistributionZoneManager(
         this.logicalTopologyService = logicalTopologyService;
         this.vaultMgr = vaultMgr;
 
-        this.watchListener = createMetastorageListener();
+        this.topologyWatchListener = createMetastorageTopologyListener();
+
+        this.dataNodesWatchListener = createMetastorageDataNodesListener();
 
         zonesState = new ConcurrentHashMap<>();
 
-        logicalTopology = Collections.emptySet();
+        logicalTopology = emptySet();
 
         executor = new ScheduledThreadPoolExecutor(
                 Math.min(Runtime.getRuntime().availableProcessors() * 3, 20),
                 new 
NamedThreadFactory(NamedThreadFactory.threadPrefix(nodeName, 
DISTRIBUTION_ZONE_MANAGER_POOL_NAME), LOG),
                 new ThreadPoolExecutor.DiscardPolicy()
         );
+
+        dataNodes.put(DEFAULT_ZONE_ID, new DataNodes());

Review Comment:
   `dataNodes` is not thread safe and you put value to it without 
synchronisation. Also, I would move creation of `dataNodes` from class filed to 
the constructor. 



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -480,6 +550,183 @@ public int getZoneId(String name) {
         }
     }
 
+    /**
+     * The method for obtaining data nodes of the specified zone.
+     * If the {@link 
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp}
+     * or {@link 
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown}
+     * equals to 0 then returned completable future will be completed when 
data nodes will be saved to the meta storage.
+     *
+     * @param zoneId Zone id.
+     * @param topVer Topology version.
+     * @return The data nodes future.
+     */
+    public CompletableFuture<Set<String>> getDataNodes(int zoneId, long 
topVer) {

Review Comment:
   What should we return in this method in scenario, when scale up / down are 
non-null, and we restart some node? So, scenario is: we had cluster with data 
nodes [A,B,C], C was restarted, and some table is going to be created from C 
after restart, which data nodes we should get to calculate assignments? In your 
code, it seems that we will return an empty data nodes, is it a correct 
behaviour? 



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -480,6 +550,183 @@ public int getZoneId(String name) {
         }
     }
 
+    /**
+     * The method for obtaining data nodes of the specified zone.
+     * If the {@link 
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp}
+     * or {@link 
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown}
+     * equals to 0 then returned completable future will be completed when 
data nodes will be saved to the meta storage.
+     *
+     * @param zoneId Zone id.
+     * @param topVer Topology version.
+     * @return The data nodes future.
+     */
+    public CompletableFuture<Set<String>> getDataNodes(int zoneId, long 
topVer) {
+        CompletableFuture<Set<String>> dataNodesFut = new 
CompletableFuture<>();
+
+        CompletableFuture<Void> topVerFut;
+
+        synchronized (dataNodesMutex) {
+            dataNodes.putIfAbsent(zoneId, new DataNodes());
+
+            if (topVer > lastTopVer) {
+                topVerFut = topVerFutures.computeIfAbsent(topVer, key -> new 
CompletableFuture<>());
+            } else {
+                topVerFut = completedFuture(null);
+            }
+        }
+
+        boolean immediateScaleUp0 = false;
+        boolean immediateScaleDown0 = false;
+
+        if (zoneId == DEFAULT_ZONE_ID) {
+            immediateScaleUp0 = 
zonesConfiguration.defaultDistributionZone().dataNodesAutoAdjustScaleUp().value()
 == 0;
+            immediateScaleDown0 = 
zonesConfiguration.defaultDistributionZone().dataNodesAutoAdjustScaleDown().value()
 == 0;
+        } else {
+            NamedConfigurationTree<DistributionZoneConfiguration, 
DistributionZoneView, DistributionZoneChange> zones =
+                    zonesConfiguration.distributionZones();
+
+            for (int i = 0; i < zones.value().size(); i++) {
+                DistributionZoneView zone = zones.value().get(i);
+
+                if (zone.zoneId() == zoneId) {
+                    immediateScaleUp0 = zone.dataNodesAutoAdjustScaleUp() == 0;
+                    immediateScaleDown0 = zone.dataNodesAutoAdjustScaleDown() 
== 0;
+
+                    break;
+                }
+            }
+        }
+
+        boolean immediateScaleUp = immediateScaleUp0;
+        boolean immediateScaleDown = immediateScaleDown0;
+
+        if (!immediateScaleUp && !immediateScaleDown) {
+            synchronized (dataNodesMutex) {
+
+                DataNodes dataNodes = this.dataNodes.get(zoneId);
+
+                Set<String> nodes;
+
+                if (dataNodes == null) {
+                    nodes = emptySet();
+                } else {
+                    nodes = this.dataNodes.get(zoneId).nodes();
+                }
+
+
+                return completedFuture(nodes);
+            }
+        }
+
+        CompletableFuture<Void> topVerScaleUpFut = new CompletableFuture<>();
+
+        topVerFut.thenAccept(ignored -> {
+            synchronized (dataNodesMutex) {
+                CompletableFuture<Void> topVerScaleUpFut0 = null;
+
+                if (immediateScaleUp) {
+                    Map.Entry<Long, Long> scaleUpRevisionEntry = 
topVerAndScaleUpRevision.ceilingEntry(topVer);
+
+                    Long scaleUpRevision = null;
+
+                    if (scaleUpRevisionEntry != null) {
+                        scaleUpRevision = scaleUpRevisionEntry.getValue();
+                    }
+
+                    if (scaleUpRevision != null && 
dataNodes.get(zoneId).scaleUpRevision() < scaleUpRevision) {

Review Comment:
   seems like it is possible that while we were waiting for completion of 
`topVerFut`, zone was deleted, potential NPE here 



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -480,6 +550,183 @@ public int getZoneId(String name) {
         }
     }
 
+    /**
+     * The method for obtaining data nodes of the specified zone.
+     * If the {@link 
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp}
+     * or {@link 
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown}
+     * equals to 0 then returned completable future will be completed when 
data nodes will be saved to the meta storage.
+     *
+     * @param zoneId Zone id.
+     * @param topVer Topology version.
+     * @return The data nodes future.
+     */
+    public CompletableFuture<Set<String>> getDataNodes(int zoneId, long 
topVer) {

Review Comment:
   Note, that I'm not talking about the first start of the cluster, where we 
agreed to return an empty data nodes, as far as user set non-immediate scale 
up/down



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to