alievmirza commented on code in PR #1729:
URL: https://github.com/apache/ignite-3/pull/1729#discussion_r1134277628
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -209,17 +253,21 @@ public DistributionZoneManager(
this.logicalTopologyService = logicalTopologyService;
this.vaultMgr = vaultMgr;
- this.watchListener = createMetastorageListener();
+ this.topologyWatchListener = createMetastorageTopologyListener();
+
+ this.dataNodesWatchListener = createMetastorageDataNodesListener();
zonesState = new ConcurrentHashMap<>();
- logicalTopology = Collections.emptySet();
+ logicalTopology = emptySet();
executor = new ScheduledThreadPoolExecutor(
Math.min(Runtime.getRuntime().availableProcessors() * 3, 20),
new
NamedThreadFactory(NamedThreadFactory.threadPrefix(nodeName,
DISTRIBUTION_ZONE_MANAGER_POOL_NAME), LOG),
new ThreadPoolExecutor.DiscardPolicy()
);
+
+ dataNodes.put(DEFAULT_ZONE_ID, new DataNodes());
Review Comment:
`dataNodes` is not thread safe and you put value to it without
synchronisation. Also, I would move creation of `dataNodes` from class filed to
the constructor.
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -480,6 +550,183 @@ public int getZoneId(String name) {
}
}
+ /**
+ * The method for obtaining data nodes of the specified zone.
+ * If the {@link
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp}
+ * or {@link
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown}
+ * equals to 0 then returned completable future will be completed when
data nodes will be saved to the meta storage.
+ *
+ * @param zoneId Zone id.
+ * @param topVer Topology version.
+ * @return The data nodes future.
+ */
+ public CompletableFuture<Set<String>> getDataNodes(int zoneId, long
topVer) {
Review Comment:
What should we return in this method in scenario, when scale up / down are
non-null, and we restart some node? So, scenario is: we had cluster with data
nodes [A,B,C], C was restarted, and some table is going to be created from C
after restart, which data nodes we should get to calculate assignments? In your
code, it seems that we will return an empty data nodes, is it a correct
behaviour?
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -480,6 +550,183 @@ public int getZoneId(String name) {
}
}
+ /**
+ * The method for obtaining data nodes of the specified zone.
+ * If the {@link
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp}
+ * or {@link
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown}
+ * equals to 0 then returned completable future will be completed when
data nodes will be saved to the meta storage.
+ *
+ * @param zoneId Zone id.
+ * @param topVer Topology version.
+ * @return The data nodes future.
+ */
+ public CompletableFuture<Set<String>> getDataNodes(int zoneId, long
topVer) {
+ CompletableFuture<Set<String>> dataNodesFut = new
CompletableFuture<>();
+
+ CompletableFuture<Void> topVerFut;
+
+ synchronized (dataNodesMutex) {
+ dataNodes.putIfAbsent(zoneId, new DataNodes());
+
+ if (topVer > lastTopVer) {
+ topVerFut = topVerFutures.computeIfAbsent(topVer, key -> new
CompletableFuture<>());
+ } else {
+ topVerFut = completedFuture(null);
+ }
+ }
+
+ boolean immediateScaleUp0 = false;
+ boolean immediateScaleDown0 = false;
+
+ if (zoneId == DEFAULT_ZONE_ID) {
+ immediateScaleUp0 =
zonesConfiguration.defaultDistributionZone().dataNodesAutoAdjustScaleUp().value()
== 0;
+ immediateScaleDown0 =
zonesConfiguration.defaultDistributionZone().dataNodesAutoAdjustScaleDown().value()
== 0;
+ } else {
+ NamedConfigurationTree<DistributionZoneConfiguration,
DistributionZoneView, DistributionZoneChange> zones =
+ zonesConfiguration.distributionZones();
+
+ for (int i = 0; i < zones.value().size(); i++) {
+ DistributionZoneView zone = zones.value().get(i);
+
+ if (zone.zoneId() == zoneId) {
+ immediateScaleUp0 = zone.dataNodesAutoAdjustScaleUp() == 0;
+ immediateScaleDown0 = zone.dataNodesAutoAdjustScaleDown()
== 0;
+
+ break;
+ }
+ }
+ }
+
+ boolean immediateScaleUp = immediateScaleUp0;
+ boolean immediateScaleDown = immediateScaleDown0;
+
+ if (!immediateScaleUp && !immediateScaleDown) {
+ synchronized (dataNodesMutex) {
+
+ DataNodes dataNodes = this.dataNodes.get(zoneId);
+
+ Set<String> nodes;
+
+ if (dataNodes == null) {
+ nodes = emptySet();
+ } else {
+ nodes = this.dataNodes.get(zoneId).nodes();
+ }
+
+
+ return completedFuture(nodes);
+ }
+ }
+
+ CompletableFuture<Void> topVerScaleUpFut = new CompletableFuture<>();
+
+ topVerFut.thenAccept(ignored -> {
+ synchronized (dataNodesMutex) {
+ CompletableFuture<Void> topVerScaleUpFut0 = null;
+
+ if (immediateScaleUp) {
+ Map.Entry<Long, Long> scaleUpRevisionEntry =
topVerAndScaleUpRevision.ceilingEntry(topVer);
+
+ Long scaleUpRevision = null;
+
+ if (scaleUpRevisionEntry != null) {
+ scaleUpRevision = scaleUpRevisionEntry.getValue();
+ }
+
+ if (scaleUpRevision != null &&
dataNodes.get(zoneId).scaleUpRevision() < scaleUpRevision) {
Review Comment:
seems like it is possible that while we were waiting for completion of
`topVerFut`, zone was deleted, potential NPE here
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -480,6 +550,183 @@ public int getZoneId(String name) {
}
}
+ /**
+ * The method for obtaining data nodes of the specified zone.
+ * If the {@link
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp}
+ * or {@link
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown}
+ * equals to 0 then returned completable future will be completed when
data nodes will be saved to the meta storage.
+ *
+ * @param zoneId Zone id.
+ * @param topVer Topology version.
+ * @return The data nodes future.
+ */
+ public CompletableFuture<Set<String>> getDataNodes(int zoneId, long
topVer) {
Review Comment:
Note, that I'm not talking about the first start of the cluster, where we
agreed to return an empty data nodes, as far as user set non-immediate scale
up/down
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]