xtern commented on code in PR #4137:
URL: https://github.com/apache/ignite-3/pull/4137#discussion_r1709000167
##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -376,10 +406,110 @@ private static List<String> missingNodes(Set<String>
requiredNodes, Collection<L
return
requiredNodes.stream().filter(not(logicalNodeIds::contains)).collect(Collectors.toList());
}
+ CompletableFuture<Void> propagateTimeToReplicas(long minimumRequiredTime) {
+ Map<Integer, Integer> tablesWithPartitions =
catalogManagerFacade.collectTablesWithPartitionsBetween(
+ minimumRequiredTime,
+ clockService.nowLong()
+ );
+
+ return invokeOnReplicas(
+ tablesWithPartitions.entrySet().iterator(),
+ minimumRequiredTime,
+ clockService.now()
+ );
+ }
+
+ private CompletableFuture<Void> invokeOnReplicas(
+ Iterator<Map.Entry<Integer, Integer>> tabItr,
+ long txBeginTime,
+ HybridTimestamp nowTs
+ ) {
+ if (!tabItr.hasNext()) {
+ return CompletableFutures.nullCompletedFuture();
+ }
+
+ Entry<Integer, Integer> tableWithPartsCount = tabItr.next();
+ int parts = tableWithPartsCount.getValue();
+
+ List<CompletableFuture<?>> partitionFutures = new ArrayList<>();
+
+ for (int p = 0; p < parts; p++) {
+ TablePartitionId replicationGroupId = new
TablePartitionId(tableWithPartsCount.getKey(), p);
+
+ CompletableFuture<Object> fut =
placementDriver.getAssignments(replicationGroupId, nowTs)
+ .thenCompose(tokenizedAssignments -> {
+ if (tokenizedAssignments == null) {
+
throwAssignmentsNotReadyException(replicationGroupId);
+ }
+
+ Assignment assignment =
tokenizedAssignments.nodes().iterator().next();
+
+ TablePartitionIdMessage partIdMessage =
ReplicaMessageUtils.toTablePartitionIdMessage(
+ REPLICA_MESSAGES_FACTORY,
+ replicationGroupId
+ );
+
+ UpdateMinimumActiveTxBeginTimeReplicaRequest msg =
REPLICATION_MESSAGES_FACTORY
+ .updateMinimumActiveTxBeginTimeReplicaRequest()
+ .groupId(partIdMessage)
+ .timestamp(txBeginTime)
+ .build();
+
+ return
replicaService.invoke(assignment.consistentId(), msg);
+ });
+
+ partitionFutures.add(fut);
+ }
+
+ return CompletableFutures.allOf(partitionFutures)
Review Comment:
> why it's safe to silently ignore all types of exceptions
We don't ignore exceptions here. Exception will be logged.
```
CompletableFuture<Void> propagateToReplicasFut =
propagateTimeToReplicas(minActiveTxBeginTime)
.whenComplete((res, ex) -> {
if (ex != null) {
LOG.warn("Failed to propagate minimum
active tx begin time to replicas", ex);
}
});
```
> why recursive call is considered as a proper failover
We don't have any failover :thinking: process will be aborted in case of
any exception with exception logging
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]