denis-chudov commented on code in PR #2095:
URL: https://github.com/apache/ignite-3/pull/2095#discussion_r1279105584


##########
modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java:
##########
@@ -465,23 +468,30 @@ public static <T> void assertValueInStorage(
      */
     public static void assertDataNodesFromManager(
             DistributionZoneManager distributionZoneManager,
+            Supplier<Long> causalityToken,
             int zoneId,
             @Nullable Set<LogicalNode> expectedValue,
             long timeoutMillis
-    ) throws InterruptedException {
+    ) throws InterruptedException, ExecutionException, TimeoutException {
         Set<String> expectedValueNames =
                 expectedValue == null ? null : 
expectedValue.stream().map(ClusterNode::name).collect(Collectors.toSet());
 
         boolean success = waitForCondition(() -> {
-            // TODO: https://issues.apache.org/jira/browse/IGNITE-19506 change 
this to the causality versioned call to dataNodes.
-            Set<String> dataNodes = distributionZoneManager.dataNodes(zoneId);
+            Set<String> dataNodes = null;
+            try {
+                dataNodes = 
distributionZoneManager.dataNodes(causalityToken.get(), zoneId).get(5, 
TimeUnit.SECONDS);
+            } catch (Exception e) {
+                // Ignore
+            }
 
             return Objects.equals(dataNodes, expectedValueNames);
         }, timeoutMillis);
 
         // We do a second check simply to print a nice error message in case 
the condition above is not achieved.
         if (!success) {
-            Set<String> dataNodes = distributionZoneManager.dataNodes(zoneId);
+            Set<String> dataNodes = null;
+
+            dataNodes = 
distributionZoneManager.dataNodes(causalityToken.get(), zoneId).get(5, 
TimeUnit.SECONDS);

Review Comment:
   can be merged into one line



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -598,69 +598,69 @@ private CompletableFuture<?> 
onTableCreate(ConfigurationNotificationEvent<TableV
             CatalogTableDescriptor tableDescriptor = 
toTableDescriptor(ctx.newValue());
             CatalogZoneDescriptor zoneDescriptor = 
getZoneDescriptor(tableDescriptor.zoneId());
 
-            List<Set<Assignment>> assignments;
+            CompletableFuture<List<Set<Assignment>>> assignmentsFuture;
 
             int tableId = tableDescriptor.id();
 
             // Check if the table already has assignments in the vault.
             // So, it means, that it is a recovery process and we should use 
the vault assignments instead of calculation for the new ones.
             if (partitionAssignments(vaultManager, tableId, 0) != null) {
-                assignments = tableAssignments(vaultManager, tableId, 
zoneDescriptor.partitions());
+                assignmentsFuture = 
completedFuture(tableAssignments(vaultManager, tableId, 
zoneDescriptor.partitions()));
             } else {
-                assignments = AffinityUtils.calculateAssignments(
-                        // TODO: 
https://issues.apache.org/jira/browse/IGNITE-19425 use data nodes from 
DistributionZoneManager instead.
-                        
baselineMgr.nodes().stream().map(ClusterNode::name).collect(toList()),
-                        zoneDescriptor.partitions(),
-                        zoneDescriptor.replicas()
-                );
+                assignmentsFuture = 
distributionZoneManager.dataNodes(ctx.storageRevision(), 
tableDescriptor.zoneId())
+                        .thenApply(dataNodes -> 
AffinityUtils.calculateAssignments(
+                                dataNodes,
+                                zoneDescriptor.partitions(),
+                                zoneDescriptor.replicas()
+                        ));
             }
 
-            assert !assignments.isEmpty() : "Couldn't create the table with 
empty assignments.";
-
             CompletableFuture<?> createTableFut = createTableLocally(
                     ctx.storageRevision(),
                     tableDescriptor,
                     zoneDescriptor,
-                    assignments
+                    assignmentsFuture
             ).whenComplete((v, e) -> {
                 if (e == null) {
                     for (var listener : assignmentsChangeListeners) {
                         listener.accept(this);
                     }
                 }
-            });
-
-            // TODO: https://issues.apache.org/jira/browse/IGNITE-19506 
Probably should be reworked so that
-            // the future is returned along with createTableFut. Right now it 
will break some tests.
-            writeTableAssignmentsToMetastore(tableId, assignments);
+            }).thenCompose(ignored -> 
writeTableAssignmentsToMetastore(tableId, assignmentsFuture));
 
             return createTableFut;
         } finally {
             busyLock.leaveBusy();
         }
     }
 
-    private CompletableFuture<Boolean> writeTableAssignmentsToMetastore(int 
tableId, List<Set<Assignment>> assignments) {
-        assert !assignments.isEmpty();
+    private CompletableFuture<Boolean> writeTableAssignmentsToMetastore(
+            int tableId,
+            CompletableFuture<List<Set<Assignment>>> assignmentsFuture
+    ) {
+        return assignmentsFuture.thenCompose(newAssignments -> {
+            assert !newAssignments.isEmpty();
 
-        List<Operation> partitionAssignments = new 
ArrayList<>(assignments.size());
+            List<Operation> partitionAssignments = new 
ArrayList<>(newAssignments.size());
 
-        for (int i = 0; i < assignments.size(); i++) {
-            partitionAssignments.add(put(
-                    stablePartAssignmentsKey(
-                            new TablePartitionId(tableId, i)),
-                    ByteUtils.toBytes(assignments.get(i))));
-        }
+            for (int i = 0; i < newAssignments.size(); i++) {
+                partitionAssignments.add(put(
+                        stablePartAssignmentsKey(
+                                new TablePartitionId(tableId, i)),
+                        ByteUtils.toBytes(newAssignments.get(i))));
+            }
 
-        Condition condition = Conditions.notExists(new 
ByteArray(partitionAssignments.get(0).key()));
+            Condition condition = Conditions.notExists(new 
ByteArray(partitionAssignments.get(0).key()));
 
-        return metaStorageMgr
-                .invoke(condition, partitionAssignments, 
Collections.emptyList())
-                .exceptionally(e -> {
-                    LOG.error("Couldn't write assignments to metastore", e);
+            return metaStorageMgr
+                    .invoke(condition, partitionAssignments, 
Collections.emptyList())
+                    .exceptionally(e -> {
+                        LOG.error("Couldn't write assignments to metastore", 
e);
+
+                        return null;
+                    });
+        });
 

Review Comment:
   pls remove empty line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to