sanpwc commented on code in PR #1729:
URL: https://github.com/apache/ignite-3/pull/1729#discussion_r1157494481
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -501,6 +543,222 @@ public int getZoneId(String name) {
}
}
+ /**
+ * The method for obtaining data nodes of the specified zone.
+ * The flow for the future completion:
+ * Waiting for DistributionZoneManager observe passed topology version or
greater version in topologyWatchListener.
+ * If the {@link
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp} equals to 0
than wait for writing data nodes triggered
+ * by started nodes and corresponding to the passed topology version or
greater topology version
+ * to the data nodes into the meta storage.
+ * If the {@link
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown} equals to 0
than wait for writing data nodes
+ * triggered by stopped nodes and corresponding to the passed topology
version or greater topology version
+ * to the data nodes into the meta storage.
+ * After waiting it returns the future with data nodes of the specified
zone.
+ *
+ * @param zoneId Zone id.
+ * @param topVer Topology version.
+ * @return The data nodes future.
+ */
+ public CompletableFuture<Set<String>> topologyVersionedDataNodes(int
zoneId, long topVer) {
+ CompletableFuture<Void> topVerFut = awaitTopologyVersion(topVer);
+
+ CompletableFuture<IgniteBiTuple<Boolean, Boolean>> timerValuesFut =
getImmediateTimers(zoneId, topVerFut);
+
+ CompletableFuture<Void> topVerScaleUpFut = scaleUpAwaiting(zoneId,
timerValuesFut);
+
+ CompletableFuture<Void> topVerScaleDownFut = scaleDownAwaiting(zoneId,
timerValuesFut);
+
+ return getDataNodesFuture(zoneId, topVerScaleUpFut,
topVerScaleDownFut);
+ }
+
+ /**
+ * Waits for DistributionZoneManager observe passed topology version or
greater version in topologyWatchListener.
+ *
+ * @param topVer Topology version.
+ * @return Future for chaining.
+ */
+ private CompletableFuture<Void> awaitTopologyVersion(long topVer) {
+ synchronized (dataNodesMutex) {
+ if (topVer > lastTopVer) {
+ return topVerFutures.computeIfAbsent(topVer, key -> new
CompletableFuture<>());
+ } else {
+ return completedFuture(null);
+ }
+ }
+ }
+
+ /**
+ * Transforms {@link
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp}
+ * and {@link
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown} values to
boolean values.
+ * True if it equals to zero and false if it greater than zero. Zero means
that data nodes changing must be scheduled immediate.
+ *
+ * @param zoneId Zone id.
+ * @param topVerFut Future for chaining.
+ * @return Future.
+ */
+ private CompletableFuture<IgniteBiTuple<Boolean, Boolean>>
getImmediateTimers(int zoneId, CompletableFuture<Void> topVerFut) {
+ CompletableFuture<IgniteBiTuple<Boolean, Boolean>> timerValuesFut =
new CompletableFuture<>();
+
+ topVerFut.thenAccept(ignored -> {
+ DistributionZoneConfiguration zoneCfg =
getZoneById(zonesConfiguration, zoneId);
+
+ timerValuesFut.complete(new IgniteBiTuple<>(
+ zoneCfg.dataNodesAutoAdjustScaleUp().value() == 0,
Review Comment:
0 to constant?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]