sergeyuttsel commented on code in PR #1729:
URL: https://github.com/apache/ignite-3/pull/1729#discussion_r1135744172
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -480,6 +550,183 @@ public int getZoneId(String name) {
}
}
+ /**
+ * The method for obtaining data nodes of the specified zone.
+ * If the {@link
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp}
+ * or {@link
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown}
+ * equals to 0 then returned completable future will be completed when
data nodes will be saved to the meta storage.
+ *
+ * @param zoneId Zone id.
+ * @param topVer Topology version.
+ * @return The data nodes future.
+ */
+ public CompletableFuture<Set<String>> getDataNodes(int zoneId, long
topVer) {
+ CompletableFuture<Set<String>> dataNodesFut = new
CompletableFuture<>();
+
+ CompletableFuture<Void> topVerFut;
+
+ synchronized (dataNodesMutex) {
+ dataNodes.putIfAbsent(zoneId, new DataNodes());
+
+ if (topVer > lastTopVer) {
+ topVerFut = topVerFutures.computeIfAbsent(topVer, key -> new
CompletableFuture<>());
+ } else {
+ topVerFut = completedFuture(null);
+ }
+ }
+
+ boolean immediateScaleUp0 = false;
+ boolean immediateScaleDown0 = false;
+
+ if (zoneId == DEFAULT_ZONE_ID) {
+ immediateScaleUp0 =
zonesConfiguration.defaultDistributionZone().dataNodesAutoAdjustScaleUp().value()
== 0;
+ immediateScaleDown0 =
zonesConfiguration.defaultDistributionZone().dataNodesAutoAdjustScaleDown().value()
== 0;
+ } else {
+ NamedConfigurationTree<DistributionZoneConfiguration,
DistributionZoneView, DistributionZoneChange> zones =
+ zonesConfiguration.distributionZones();
+
+ for (int i = 0; i < zones.value().size(); i++) {
+ DistributionZoneView zone = zones.value().get(i);
+
+ if (zone.zoneId() == zoneId) {
+ immediateScaleUp0 = zone.dataNodesAutoAdjustScaleUp() == 0;
+ immediateScaleDown0 = zone.dataNodesAutoAdjustScaleDown()
== 0;
+
+ break;
+ }
+ }
+ }
+
+ boolean immediateScaleUp = immediateScaleUp0;
+ boolean immediateScaleDown = immediateScaleDown0;
+
+ if (!immediateScaleUp && !immediateScaleDown) {
+ synchronized (dataNodesMutex) {
+
+ DataNodes dataNodes = this.dataNodes.get(zoneId);
+
+ Set<String> nodes;
+
+ if (dataNodes == null) {
+ nodes = emptySet();
+ } else {
+ nodes = this.dataNodes.get(zoneId).nodes();
+ }
+
+
+ return completedFuture(nodes);
+ }
+ }
+
+ CompletableFuture<Void> topVerScaleUpFut = new CompletableFuture<>();
+
+ topVerFut.thenAccept(ignored -> {
+ synchronized (dataNodesMutex) {
+ CompletableFuture<Void> topVerScaleUpFut0 = null;
+
+ if (immediateScaleUp) {
+ Map.Entry<Long, Long> scaleUpRevisionEntry =
topVerAndScaleUpRevision.ceilingEntry(topVer);
+
+ Long scaleUpRevision = null;
+
+ if (scaleUpRevisionEntry != null) {
+ scaleUpRevision = scaleUpRevisionEntry.getValue();
+ }
+
+ if (scaleUpRevision != null &&
dataNodes.get(zoneId).scaleUpRevision() < scaleUpRevision) {
Review Comment:
Thanks. Fixed it here and other places.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]