alievmirza commented on code in PR #7018:
URL: https://github.com/apache/ignite-3/pull/7018#discussion_r2558954501
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -382,6 +390,22 @@ public CompletableFuture<Set<String>>
dataNodes(HybridTimestamp timestamp, int c
return dataNodesManager.dataNodes(zoneId, timestamp, catalogVersion);
}
+ /**
+ * Gets data nodes of the zone using catalog version. The timestamp which
is used for data nodes retrieval is taken from the catalog
+ * with the given {@code catalogVersion}.
+ *
+ * <p>Return data nodes or throw the exception:
+ * {@link IllegalArgumentException} if zoneId is not valid.
+ * {@link DistributionZoneNotFoundException} if the zone with the provided
zoneId does not exist.
+ *
+ * @param catalogVersion Catalog version.
+ * @param zoneId Zone id.
+ * @return The future with data nodes for the zoneId.
+ */
+ public CompletableFuture<Set<String>> dataNodes(int catalogVersion, int
zoneId) {
+ return dataNodes(INITIAL_TIMESTAMP, catalogVersion, zoneId);
+ }
+
public static Set<Node> dataNodes(Map<Node, Integer> dataNodesMap) {
Review Comment:
this method is not used, please remove
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DataNodesManager.java:
##########
@@ -1065,14 +1064,43 @@ private Operation addNewEntryToDataNodesHistory(
if (!addMandatoryEntry
&& !history.isEmpty()
&&
nodes.equals(history.dataNodesForTimestamp(HybridTimestamp.MAX_VALUE).dataNodes()))
{
- return noop();
+ return null;
} else {
- DataNodesHistory newHistory = history.addHistoryEntry(timestamp,
nodes);
+ HybridTimestamp now = clockService.current();
+ HybridTimestamp earliestTimestampNeededForHistory =
earliestTimestampNeededForHistory(now);
+ DataNodesHistory newHistory = history
+ .addHistoryEntry(timestamp, nodes)
+ .compactIfNeeded(earliestTimestampNeededForHistory);
dataNodesHistoryVolatile.put(zoneId, newHistory);
- return put(zoneDataNodesHistoryKey(zoneId),
DataNodesHistorySerializer.serialize(newHistory));
+
+ int compactedEntriesCount = history.size() - (newHistory.size() -
1);
+ if (compactedEntriesCount > 0) {
+ LOG.info("Data nodes history compacted [zoneId={},
compactedEntriesCount={}, atTimestamp={}, earliestTimestampNeeded={}].",
+ zoneId, compactedEntriesCount, now,
earliestTimestampNeededForHistory);
+ }
+
+ return newHistory;
}
}
+ private Operation addNewEntryOperation(int zoneId, @Nullable
DataNodesHistory history) {
+ if (history == null) {
+ return noop();
+ } else {
+ return put(zoneDataNodesHistoryKey(zoneId),
DataNodesHistorySerializer.serialize(history));
+ }
+ }
+
+ private HybridTimestamp earliestTimestampNeededForHistory(HybridTimestamp
timestamp) {
+ long minTimeAvailable = timestamp.getPhysical()
+ -
gcConfiguration.lowWatermark().dataAvailabilityTimeMillis().value()
+ - clockService.maxClockSkewMillis();
+ long minCatalogTimeAvailable =
hybridTimestamp(catalogManager.earliestCatalog().time()).getPhysical();
+
+ long minPhysical = max(min(minTimeAvailable, minCatalogTimeAvailable),
1);
Review Comment:
should we take into account `maxClockSkewMillis` here as well?
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DataNodesHistory.java:
##########
@@ -62,6 +62,26 @@ public DataNodesHistory addHistoryEntry(HybridTimestamp
timestamp, Set<NodeWithA
return dataNodesHistory;
}
+ /**
+ * Copies existing history and compacts it to the given timestamp
inclusively. Leaves at least one entry.
+ *
+ * @param toTimestamp Minimal timestamp to leave in the history.
+ * @return New data nodes history.
+ */
+ public DataNodesHistory compactIfNeeded(HybridTimestamp toTimestamp) {
Review Comment:
package-private?
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DataNodesManager.java:
##########
@@ -1065,14 +1064,43 @@ private Operation addNewEntryToDataNodesHistory(
if (!addMandatoryEntry
&& !history.isEmpty()
&&
nodes.equals(history.dataNodesForTimestamp(HybridTimestamp.MAX_VALUE).dataNodes()))
{
- return noop();
+ return null;
} else {
- DataNodesHistory newHistory = history.addHistoryEntry(timestamp,
nodes);
+ HybridTimestamp now = clockService.current();
+ HybridTimestamp earliestTimestampNeededForHistory =
earliestTimestampNeededForHistory(now);
+ DataNodesHistory newHistory = history
+ .addHistoryEntry(timestamp, nodes)
+ .compactIfNeeded(earliestTimestampNeededForHistory);
dataNodesHistoryVolatile.put(zoneId, newHistory);
- return put(zoneDataNodesHistoryKey(zoneId),
DataNodesHistorySerializer.serialize(newHistory));
+
+ int compactedEntriesCount = history.size() - (newHistory.size() -
1);
+ if (compactedEntriesCount > 0) {
+ LOG.info("Data nodes history compacted [zoneId={},
compactedEntriesCount={}, atTimestamp={}, earliestTimestampNeeded={}].",
+ zoneId, compactedEntriesCount, now,
earliestTimestampNeededForHistory);
+ }
+
+ return newHistory;
}
}
+ private Operation addNewEntryOperation(int zoneId, @Nullable
DataNodesHistory history) {
+ if (history == null) {
+ return noop();
+ } else {
+ return put(zoneDataNodesHistoryKey(zoneId),
DataNodesHistorySerializer.serialize(history));
+ }
+ }
+
+ private HybridTimestamp earliestTimestampNeededForHistory(HybridTimestamp
timestamp) {
+ long minTimeAvailable = timestamp.getPhysical()
+ -
gcConfiguration.lowWatermark().dataAvailabilityTimeMillis().value()
+ - clockService.maxClockSkewMillis();
Review Comment:
shouldn't be + maxClockSkewMillis?
also, please add describing comment, explaining the physical meaning of all
calculations or javadoc will be even beetter.
##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DataNodesManagerTest.java:
##########
@@ -478,6 +496,75 @@ public void
nodeRejoinWithDifferentAttributesWithAutoAdjustAlteration() throws I
assertEquals(C_DIFFERENT_ATTRS.userAttributes().get("region"),
c.userAttributes().get("region"));
}
+ @Test
+ public void testHistoryCompaction() throws InterruptedException {
+ alterZone(ZONE_NAME_1, 0, 0, null);
+
+ removeNodes(Set.of(C));
Review Comment:
I do not understand, why do you remove C if C is not in the data nodes?
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DataNodesManager.java:
##########
@@ -1065,14 +1064,43 @@ private Operation addNewEntryToDataNodesHistory(
if (!addMandatoryEntry
&& !history.isEmpty()
&&
nodes.equals(history.dataNodesForTimestamp(HybridTimestamp.MAX_VALUE).dataNodes()))
{
- return noop();
+ return null;
} else {
- DataNodesHistory newHistory = history.addHistoryEntry(timestamp,
nodes);
+ HybridTimestamp now = clockService.current();
+ HybridTimestamp earliestTimestampNeededForHistory =
earliestTimestampNeededForHistory(now);
+ DataNodesHistory newHistory = history
+ .addHistoryEntry(timestamp, nodes)
+ .compactIfNeeded(earliestTimestampNeededForHistory);
dataNodesHistoryVolatile.put(zoneId, newHistory);
- return put(zoneDataNodesHistoryKey(zoneId),
DataNodesHistorySerializer.serialize(newHistory));
+
+ int compactedEntriesCount = history.size() - (newHistory.size() -
1);
+ if (compactedEntriesCount > 0) {
+ LOG.info("Data nodes history compacted [zoneId={},
compactedEntriesCount={}, atTimestamp={}, earliestTimestampNeeded={}].",
+ zoneId, compactedEntriesCount, now,
earliestTimestampNeededForHistory);
+ }
+
+ return newHistory;
}
}
+ private Operation addNewEntryOperation(int zoneId, @Nullable
DataNodesHistory history) {
+ if (history == null) {
+ return noop();
+ } else {
+ return put(zoneDataNodesHistoryKey(zoneId),
DataNodesHistorySerializer.serialize(history));
+ }
+ }
+
+ private HybridTimestamp earliestTimestampNeededForHistory(HybridTimestamp
timestamp) {
+ long minTimeAvailable = timestamp.getPhysical()
Review Comment:
can't we use
`org.apache.ignite.internal.lowwatermark.LowWatermarkImpl#getLowWatermark`?
##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DataNodesManagerTest.java:
##########
@@ -478,6 +496,75 @@ public void
nodeRejoinWithDifferentAttributesWithAutoAdjustAlteration() throws I
assertEquals(C_DIFFERENT_ATTRS.userAttributes().get("region"),
c.userAttributes().get("region"));
}
+ @Test
+ public void testHistoryCompaction() throws InterruptedException {
+ alterZone(ZONE_NAME_1, 0, 0, null);
+
+ removeNodes(Set.of(C));
+
+ waitForDataNodes(ZONE_NAME_1, nodeNames(A, B));
+
+ HybridTimestamp afterRemovalTs = clock.now();
+
+ // Greater than the low watermark data availability time (500 ms).
+ Thread.sleep(600);
+
+ CatalogZoneDescriptor zoneDescriptor =
catalogManager.activeCatalog(clock.currentLong()).zone(ZONE_NAME_1);
+ catalog = mock(Catalog.class);
+ when(catalog.time()).thenAnswer(inv -> clock.now().longValue());
+ when(catalog.zone(anyInt())).thenAnswer(inv -> zoneDescriptor);
+
+ addNodes(Set.of(C));
+
+ waitForDataNodes(ZONE_NAME_1, nodeNames(A, B, C));
+
+ // Check that data nodes history is compacted.
+ DataNodesHistoryEntry compactedHistoryEntry =
dataNodesHistory(ZONE_NAME_1).dataNodesForTimestamp(afterRemovalTs);
+ assertEquals(HybridTimestamp.MIN_VALUE,
compactedHistoryEntry.timestamp());
+ assertTrue(compactedHistoryEntry.dataNodes().isEmpty());
+ }
+
+ @Test
+ public void testHistoryCompactionLeavesEntriesForNonCompactedCatalog()
throws InterruptedException {
+ alterZone(ZONE_NAME_1, 0, 0, null);
+
+ DataNodesHistoryEntry firstEntry =
dataNodesHistory(ZONE_NAME_1).dataNodesForTimestamp(clock.current());
+
+ // Greater than the low watermark data availability time (500 ms).
+ Thread.sleep(600);
+
+ addNodes(Set.of(C));
+
+ waitForDataNodes(ZONE_NAME_1, nodeNames(A, B, C));
+
+ // Check that data nodes history is not compacted.
+ DataNodesHistoryEntry nonCompactedHistoryEntry =
dataNodesHistory(ZONE_NAME_1).dataNodesForTimestamp(firstEntry.timestamp());
+ assertEquals(firstEntry.timestamp(),
nonCompactedHistoryEntry.timestamp());
+ assertEquals(firstEntry.dataNodes(),
nonCompactedHistoryEntry.dataNodes());
+ }
+
+ @Test
+ public void testHistoryCompactionLeavesEntriesForDataAvailability() throws
InterruptedException {
Review Comment:
please make this test more deterministic
##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DataNodesManagerTest.java:
##########
@@ -478,6 +496,75 @@ public void
nodeRejoinWithDifferentAttributesWithAutoAdjustAlteration() throws I
assertEquals(C_DIFFERENT_ATTRS.userAttributes().get("region"),
c.userAttributes().get("region"));
}
+ @Test
+ public void testHistoryCompaction() throws InterruptedException {
+ alterZone(ZONE_NAME_1, 0, 0, null);
+
+ removeNodes(Set.of(C));
+
+ waitForDataNodes(ZONE_NAME_1, nodeNames(A, B));
+
+ HybridTimestamp afterRemovalTs = clock.now();
+
+ // Greater than the low watermark data availability time (500 ms).
+ Thread.sleep(600);
+
+ CatalogZoneDescriptor zoneDescriptor =
catalogManager.activeCatalog(clock.currentLong()).zone(ZONE_NAME_1);
+ catalog = mock(Catalog.class);
+ when(catalog.time()).thenAnswer(inv -> clock.now().longValue());
+ when(catalog.zone(anyInt())).thenAnswer(inv -> zoneDescriptor);
+
+ addNodes(Set.of(C));
+
+ waitForDataNodes(ZONE_NAME_1, nodeNames(A, B, C));
+
+ // Check that data nodes history is compacted.
+ DataNodesHistoryEntry compactedHistoryEntry =
dataNodesHistory(ZONE_NAME_1).dataNodesForTimestamp(afterRemovalTs);
+ assertEquals(HybridTimestamp.MIN_VALUE,
compactedHistoryEntry.timestamp());
+ assertTrue(compactedHistoryEntry.dataNodes().isEmpty());
Review Comment:
let's add
```
HybridTimestamp now = clock.now();
DataNodesHistoryEntry actualHistoryEntry =
dataNodesHistory(ZONE_NAME_1).dataNodesForTimestamp(now);
assertTrue(actualHistoryEntry.dataNodes().containsAll(asList(A, B,
C)));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]