sergeyuttsel commented on code in PR #1729:
URL: https://github.com/apache/ignite-3/pull/1729#discussion_r1156787450
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -917,24 +1213,69 @@ public CompletableFuture<Void> onUpdate(WatchEvent evt) {
}
try {
- assert evt.single() : "Expected an event with one entry
but was an event with several entries with keys: "
+ assert evt.entryEvents().size() == 2 :
+ "Expected an event with logical topology and
logical topology version entries but was events with keys: "
+ evt.entryEvents().stream().map(entry ->
entry.newEntry() == null ? "null" : entry.newEntry().key())
.collect(toList());
- Entry newEntry = evt.entryEvent().newEntry();
+ long topVer = 0;
+
+ byte[] newLogicalTopologyBytes = null;
+
+ Set<String> newLogicalTopology = null;
+
+ long revision = 0;
- long revision = newEntry.revision();
+ for (EntryEvent event : evt.entryEvents()) {
+ Entry e = event.newEntry();
- byte[] newLogicalTopologyBytes = newEntry.value();
+ if (Arrays.equals(e.key(),
zonesLogicalTopologyVersionKey().bytes())) {
+ topVer = bytesToLong(e.value());
- Set<String> newLogicalTopology =
fromBytes(newLogicalTopologyBytes);
+ revision = e.revision();
+ } else if (Arrays.equals(e.key(),
zonesLogicalTopologyKey().bytes())) {
+ newLogicalTopologyBytes = e.value();
+
+ newLogicalTopology =
fromBytes(newLogicalTopologyBytes);
+ }
+ }
+
+ assert newLogicalTopology != null;
+ assert revision > 0;
+
+ Set<String> newLogicalTopology0 = newLogicalTopology;
Set<String> removedNodes =
- logicalTopology.stream().filter(node ->
!newLogicalTopology.contains(node)).collect(toSet());
+ logicalTopology.stream().filter(node ->
!newLogicalTopology0.contains(node)).collect(toSet());
Set<String> addedNodes =
newLogicalTopology.stream().filter(node ->
!logicalTopology.contains(node)).collect(toSet());
+ synchronized (dataNodesMutex) {
+ lastTopVer = topVer;
+
+ //Associates topology version and scale up meta
storage revision.
+ if (!addedNodes.isEmpty()) {
+ topVerAndScaleUpRevision.put(topVer, revision);
+
+ topVerAndScaleUpRevision.headMap(topVer).clear();
+ }
+
+ //Associates topology version and scale down meta
storage revision.
+ if (!removedNodes.isEmpty()) {
+ topVerAndScaleDownRevision.put(topVer, revision);
+
+ topVerAndScaleDownRevision.headMap(topVer).clear();
Review Comment:
Fixed. The map was removed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]