sashapolo commented on code in PR #2055:
URL: https://github.com/apache/ignite-3/pull/2055#discussion_r1191990740


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchema.java:
##########
@@ -125,34 +117,24 @@ public TableDefinitionDiff diffFrom(FullTableSchema 
prevSchema) {
             }
         }
 
-        Map<String, IndexDescriptor> prevIndexesByName = 
prevSchema.indexes.stream()
-                .collect(toMap(IndexDescriptor::name, identity()));
-        Map<String, IndexDescriptor> thisIndexesByName = this.indexes.stream()
-                .collect(toMap(IndexDescriptor::name, identity()));
+        Map<String, IndexDescriptor> prevIndexesByName = 
toMapByName(prevSchema.indexes, IndexDescriptor::name);
+        Map<String, IndexDescriptor> thisIndexesByName = 
toMapByName(this.indexes, IndexDescriptor::name);
 
-        Set<String> addedIndexNames = subtract(thisIndexesByName.keySet(), 
prevIndexesByName.keySet());
-        Set<String> removedIndexNames = subtract(prevIndexesByName.keySet(), 
thisIndexesByName.keySet());
-
-        List<IndexDescriptor> addedIndexes = 
thisIndexesByName.values().stream()
-                .filter(col -> addedIndexNames.contains(col.name()))
-                .collect(toList());
-        List<IndexDescriptor> removedIndexes = 
prevIndexesByName.values().stream()
-                .filter(col -> removedIndexNames.contains(col.name()))
-                .collect(toList());
+        List<IndexDescriptor> addedIndexes = subtractKeyed(thisIndexesByName, 
prevIndexesByName);
+        List<IndexDescriptor> removedIndexes = 
subtractKeyed(prevIndexesByName, thisIndexesByName);
 
         return new TableDefinitionDiff(addedColumns, removedColumns, 
changedColumns, addedIndexes, removedIndexes);
     }
 
-    private static Set<String> subtract(Set<String> minuend, Set<String> 
subtrahend) {
-        Set<String> result = new HashSet<>(minuend);
-        result.removeAll(subtrahend);
-        return result;
+    private static <T> Map<String, T> toMapByName(List<T> elements, 
Function<T, String> nameExtractor) {
+        return elements.stream().collect(toMap(nameExtractor, identity()));
     }
 
-    private static Set<String> intersect(Set<String> set1, Set<String> set2) {
-        Set<String> result = new HashSet<>(set1);
-        result.retainAll(set2);
-        return result;
+    private static <T> List<T> subtractKeyed(Map<String, T> diminuend, 
Map<String, T> subtrahend) {
+        return diminuend.keySet().stream()

Review Comment:
   You can use `entrySet` here to avoid calling `get` afterwards



##########
modules/schema/src/main/java/org/apache/ignite/internal/schema/NativeTypeSpec.java:
##########
@@ -355,6 +359,66 @@ public static Class<?> toClass(NativeTypeSpec spec, 
boolean nullable) {
         }
     }
 
+    /**
+     * Gets client type corresponding to this server type.
+     *
+     * @return Client type code.
+     */
+    public ColumnType asColumnType() {
+        switch (this) {
+            case INT8:
+                return ColumnType.INT8;
+
+            case INT16:
+                return ColumnType.INT16;
+
+            case INT32:
+                return ColumnType.INT32;
+
+            case INT64:
+                return ColumnType.INT64;
+
+            case FLOAT:
+                return ColumnType.FLOAT;
+
+            case DOUBLE:
+                return ColumnType.DOUBLE;
+
+            case DECIMAL:
+                return ColumnType.DECIMAL;
+
+            case NUMBER:
+                return ColumnType.NUMBER;
+
+            case UUID:
+                return ColumnType.UUID;
+
+            case STRING:
+                return ColumnType.STRING;
+
+            case BYTES:
+                return ColumnType.BYTE_ARRAY;
+
+            case BITMASK:
+                return ColumnType.BITMASK;
+
+            case DATE:
+                return ColumnType.DATE;
+
+            case TIME:
+                return ColumnType.TIME;
+
+            case DATETIME:
+                return ColumnType.DATETIME;
+
+            case TIMESTAMP:
+                return ColumnType.TIMESTAMP;
+
+            default:
+                throw new IgniteException(PROTOCOL_ERR, "Unsupported native 
type: " + this);

Review Comment:
   I don't think using `PROTOCOL_ERR` is correct here, because it's an error 
type from the Thin Client



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchema.java:
##########
@@ -125,34 +117,24 @@ public TableDefinitionDiff diffFrom(FullTableSchema 
prevSchema) {
             }
         }
 
-        Map<String, IndexDescriptor> prevIndexesByName = 
prevSchema.indexes.stream()
-                .collect(toMap(IndexDescriptor::name, identity()));
-        Map<String, IndexDescriptor> thisIndexesByName = this.indexes.stream()
-                .collect(toMap(IndexDescriptor::name, identity()));
+        Map<String, IndexDescriptor> prevIndexesByName = 
toMapByName(prevSchema.indexes, IndexDescriptor::name);
+        Map<String, IndexDescriptor> thisIndexesByName = 
toMapByName(this.indexes, IndexDescriptor::name);
 
-        Set<String> addedIndexNames = subtract(thisIndexesByName.keySet(), 
prevIndexesByName.keySet());
-        Set<String> removedIndexNames = subtract(prevIndexesByName.keySet(), 
thisIndexesByName.keySet());
-
-        List<IndexDescriptor> addedIndexes = 
thisIndexesByName.values().stream()
-                .filter(col -> addedIndexNames.contains(col.name()))
-                .collect(toList());
-        List<IndexDescriptor> removedIndexes = 
prevIndexesByName.values().stream()
-                .filter(col -> removedIndexNames.contains(col.name()))
-                .collect(toList());
+        List<IndexDescriptor> addedIndexes = subtractKeyed(thisIndexesByName, 
prevIndexesByName);
+        List<IndexDescriptor> removedIndexes = 
subtractKeyed(prevIndexesByName, thisIndexesByName);
 
         return new TableDefinitionDiff(addedColumns, removedColumns, 
changedColumns, addedIndexes, removedIndexes);
     }
 
-    private static Set<String> subtract(Set<String> minuend, Set<String> 
subtrahend) {
-        Set<String> result = new HashSet<>(minuend);
-        result.removeAll(subtrahend);
-        return result;
+    private static <T> Map<String, T> toMapByName(List<T> elements, 
Function<T, String> nameExtractor) {
+        return elements.stream().collect(toMap(nameExtractor, identity()));
     }
 
-    private static Set<String> intersect(Set<String> set1, Set<String> set2) {
-        Set<String> result = new HashSet<>(set1);
-        result.retainAll(set2);
-        return result;
+    private static <T> List<T> subtractKeyed(Map<String, T> diminuend, 
Map<String, T> subtrahend) {

Review Comment:
   `diminuend`? What does that mean?



##########
modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java:
##########
@@ -468,7 +468,7 @@ public static CompletableFuture<Void> 
handleReduceChanged(MetaStorageManager met
      * @return Result of the subtraction.
      */
     public static <T> Set<T> subtract(Set<T> minuend, Set<T> subtrahend) {

Review Comment:
   Can we get rid of this method?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java:
##########
@@ -347,12 +347,12 @@ private void 
doOnNewPeersConfigurationApplied(PeersAndLearners configuration) {
             // For further addition
             Set<Assignment> calculatedSwitchAppend = 
union(retrievedSwitchAppend, reducedNodes);
             calculatedSwitchAppend = subtract(calculatedSwitchAppend, 
addedNodes);
-            calculatedSwitchAppend = intersect(calculatedAssignments, 
calculatedSwitchAppend);
+            calculatedSwitchAppend = 
CollectionUtils.intersect(calculatedAssignments, calculatedSwitchAppend);

Review Comment:
   I propose to add a static import here, looks strange otherwise



##########
modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java:
##########
@@ -596,4 +597,15 @@ public static <T> Set<T> difference(@Nullable Set<T> a, 
@Nullable Set<T> b) {
     public static IntSet setOf(Collection<Integer> coll) {
         return IntSets.unmodifiable(new IntOpenHashSet(coll));
     }
+
+    /**
+     * Returns an intersection of two sets.
+     *
+     * @param op1 First operand.
+     * @param op2 Second operand.
+     * @return Result of the intersection.
+     */
+    public static <T> Set<T> intersect(Set<T> op1, Set<T> op2) {

Review Comment:
   No subtrahend =(



##########
modules/core/src/test/java/org/apache/ignite/internal/util/CollectionUtilsTest.java:
##########
@@ -359,4 +361,9 @@ private void testSetOf(Collection<Integer> data) {
         assertThrows(UnsupportedOperationException.class, () -> copy.add(42));
         assertThrows(UnsupportedOperationException.class, () -> 
copy.remove(3));
     }
+
+    @Test
+    void testIntersect() {

Review Comment:
   Let's add a test for en empty intersection as well



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.table.distributed.schema.FullTableSchema;
+import org.apache.ignite.internal.table.distributed.schema.Schemas;
+import org.apache.ignite.internal.table.distributed.schema.TableDefinitionDiff;
+import org.apache.ignite.internal.tx.TransactionIds;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Validates schema compatibility.
+ */
+class SchemaCompatValidator {
+    private final Schemas schemas;
+
+    SchemaCompatValidator(Schemas schemas) {
+        this.schemas = schemas;
+    }
+
+    /**
+     * Performs forward compatibility validation (if needed). That is, for 
each table enlisted in the transaction,
+     * checks to see whether the initial schema (identified by the begin 
timestamp) is forward-compatible with the
+     * commit schema (identified by the commit timestamp).
+     *
+     * <p>If doing an abort (and not commit), the validation always succeeds.
+     *
+     * @param txId ID of the transaction that gets validated.
+     * @param enlistedGroupIds IDs of the partitions that are enlisted with 
the transaction.
+     * @param commit Whether it's a commit attempt (otherwise it's an abort).
+     * @param commitTimestamp Commit timestamp (or {@code null} if it's an 
abort).
+     * @return Future completed with validation result.
+     */
+    CompletableFuture<ForwardValidationResult> validateForwards(
+            UUID txId,
+            List<TablePartitionId> enlistedGroupIds,
+            boolean commit,
+            @Nullable HybridTimestamp commitTimestamp
+    ) {
+        if (!commit) {
+            return completedFuture(ForwardValidationResult.success());
+        }
+
+        HybridTimestamp beginTimestamp = TransactionIds.beginTimestamp(txId);
+
+        List<UUID> tableIds = enlistedGroupIds.stream()

Review Comment:
   I think it looks ok if extracted into a method:
   ```
   ForwardValidationResult validateTablesSchemas(
           List<TablePartitionId> enlistedGroupIds,
           HybridTimestamp beginTimestamp,
           HybridTimestamp commitTimestamp
   ) {
       return enlistedGroupIds.stream()
               .map(TablePartitionId::tableId)
               .distinct()
               .map(tableId -> validateSchemaCompatibility(beginTimestamp, 
commitTimestamp, tableId))
               .filter(validationResult -> !validationResult.isSuccessful())
               .findAny()
               .orElse(ForwardValidationResult.success());
   }
   ```



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java:
##########
@@ -290,6 +314,8 @@ public void beforeTest(@InjectConfiguration 
DataStorageConfiguration dsCfg) {
 
         
lenient().when(safeTimeClock.waitFor(any())).thenReturn(completedFuture(null));
 
+        
lenient().when(schemas.waitForSchemasAvailability(any())).thenReturn(completedFuture(null));

Review Comment:
   ok



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -976,13 +983,34 @@ private CompletableFuture<Void> 
processTxFinishAction(TxFinishReplicaRequest req
         List<TablePartitionId> aggregatedGroupIds = 
request.groups().values().stream()
                 .flatMap(List::stream)
                 .map(IgniteBiTuple::get1)
-                .collect(Collectors.toList());
+                .collect(toList());
 
         UUID txId = request.txId();
 
-        boolean commit = request.commit();
+        return schemaCompatValidator.validateForwards(txId, 
aggregatedGroupIds, request.commit(), request.commitTimestamp())
+                .thenCompose(validationResult -> {
+                    boolean stillCommit = request.commit() && 
validationResult.isOk();
 
-        CompletableFuture<Object> changeStateFuture = 
finishTransaction(aggregatedGroupIds, txId, commit);
+                    return finishAndCleanup(request, stillCommit, 
aggregatedGroupIds, txId)
+                            .thenCompose(result -> {
+                                if (validationResult.isOk()) {
+                                    return completedFuture(result);
+                                } else {
+                                    return failedFuture(new 
AbortDueToIncompatibleSchemaException("Commit failed because schema "

Review Comment:
   oops, I missed the `stillCommit` part



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -976,13 +983,34 @@ private CompletableFuture<Void> 
processTxFinishAction(TxFinishReplicaRequest req
         List<TablePartitionId> aggregatedGroupIds = 
request.groups().values().stream()
                 .flatMap(List::stream)
                 .map(IgniteBiTuple::get1)
-                .collect(Collectors.toList());
+                .collect(toList());
 
         UUID txId = request.txId();
 
-        boolean commit = request.commit();
+        return schemaCompatValidator.validateForwards(txId, 
aggregatedGroupIds, request.commit(), request.commitTimestamp())

Review Comment:
   > But the result is that finishAndCleanup() is called twice in different 
branches, and this seems weird.
   
   I don't know but it looks perfectly fine to me:
   
   ```
   if (request.commit()) {
       return schemaCompatValidator.validateForwards(txId, aggregatedGroupIds, 
request.commitTimestamp())
               .thenCompose(validationResult -> {
                   return finishAndCleanup(request, 
validationResult.isSuccessful(), aggregatedGroupIds, txId)
                           .thenCompose(result -> 
failureIfValidationFailed(validationResult));
               });
   } else {
       return finishAndCleanup(request, false, aggregatedGroupIds, txId);
   }
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.table.distributed.schema.FullTableSchema;
+import org.apache.ignite.internal.table.distributed.schema.Schemas;
+import org.apache.ignite.internal.table.distributed.schema.TableDefinitionDiff;
+import org.apache.ignite.internal.tx.TransactionIds;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Validates schema compatibility.
+ */
+class SchemaCompatValidator {
+    private final Schemas schemas;
+
+    SchemaCompatValidator(Schemas schemas) {
+        this.schemas = schemas;
+    }
+
+    /**
+     * Performs forward compatibility validation (if needed). That is, for 
each table enlisted in the transaction,
+     * checks to see whether the initial schema (identified by the begin 
timestamp) is forward-compatible with the
+     * commit schema (identified by the commit timestamp).
+     *
+     * <p>If doing an abort (and not commit), the validation always succeeds.
+     *
+     * @param txId ID of the transaction that gets validated.
+     * @param enlistedGroupIds IDs of the partitions that are enlisted with 
the transaction.
+     * @param commit Whether it's a commit attempt (otherwise it's an abort).
+     * @param commitTimestamp Commit timestamp (or {@code null} if it's an 
abort).
+     * @return Future completed with validation result.
+     */
+    CompletableFuture<ForwardValidationResult> validateForwards(
+            UUID txId,
+            List<TablePartitionId> enlistedGroupIds,
+            boolean commit,
+            @Nullable HybridTimestamp commitTimestamp
+    ) {
+        if (!commit) {
+            return completedFuture(ForwardValidationResult.success());
+        }
+
+        HybridTimestamp beginTimestamp = TransactionIds.beginTimestamp(txId);
+
+        List<UUID> tableIds = enlistedGroupIds.stream()

Review Comment:
   Or, if you still like your `for` cycle, I think it's better to use a `Set` 
instead of `distinct().toList()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to