sashapolo commented on code in PR #2761:
URL: https://github.com/apache/ignite-3/pull/2761#discussion_r1375858483


##########
modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java:
##########
@@ -95,17 +101,40 @@ protected final <T> T sync(CompletableFuture<T> fut) {
      * @return Whatever the action returns.
      */
     protected final <T> CompletableFuture<T> withSchemaSync(@Nullable 
Transaction tx, KvAction<T> action) {
-        // TODO: IGNITE-20106 - retry if our request is rejected by the server 
due to a changed schema version.
+        return withSchemaSync(tx, null, action);
+    }
 
+    private <T> CompletableFuture<T> withSchemaSync(@Nullable Transaction tx, 
@Nullable Integer previousSchemaVersion, KvAction<T> action) {
         CompletableFuture<Integer> schemaVersionFuture = tx == null
                 ? schemaVersions.schemaVersionAtNow(tbl.tableId())
                 : schemaVersions.schemaVersionAt(((InternalTransaction) 
tx).startTimestamp(), tbl.tableId());
 
-        CompletableFuture<T> future = 
schemaVersionFuture.thenCompose(action::act);
+        CompletableFuture<T> future = schemaVersionFuture
+                .thenCompose(schemaVersion -> {
+                    return action.act(schemaVersion)
+                            .handle((BiFunction<T, Throwable, 
CompletableFuture<T>>) (res, ex) -> {

Review Comment:
   In my proposal I also provided a way to get rid of the cast to `BiFunction`, 
shall we use it?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -472,34 +472,30 @@ private CompletableFuture<?> 
processRequest(ReplicaRequest request, @Nullable Bo
             }
         }
 
-        return waitForSchemasBeforeReading(request)
-                .thenCompose(unused -> validateTableExistence(request))
-                .thenCompose(opStartTimestamp -> 
processOperationRequest(request, isPrimary, senderId, opStartTimestamp));
+        HybridTimestamp opTsIfDirectRo = (request instanceof 
ReadOnlyDirectReplicaRequest) ? hybridClock.now() : null;
+
+        return validateTableExistence(request, opTsIfDirectRo)
+                .thenCompose(unused -> validateSchemaMatch(request, 
opTsIfDirectRo))
+                .thenCompose(unused -> waitForSchemasBeforeReading(request, 
opTsIfDirectRo))
+                .thenCompose(opStartTimestamp -> 
processOperationRequest(request, isPrimary, senderId, opTsIfDirectRo));
     }
 
     /**
-     * Makes sure that we have schemas corresponding to the moment of tx 
start; this makes PK extraction safe WRT
-     * {@link org.apache.ignite.internal.schema.SchemaRegistry#schema(int)}.
+     * Validates that the table exists at a timestamp corresponding to the 
request operation.
      *
-     * @param request Request that's being processed.
+     * <ul>
+     *     <li>For an RW read/write, it's 'now'</li>
+     *     <li>For an RO read (with readTimestamp), it's readTimestamp 
(matches readTimestamp in the transaction)</li>
+     *     <li>For an RO direct read, it's the timestamp chosen (as 'now') to 
process the request</li>

Review Comment:
   Please specify that it is an "RO transaction", like you did in other places



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -520,18 +518,84 @@ private CompletableFuture<HybridTimestamp> 
validateTableExistence(ReplicaRequest
         }
 
         return schemaSyncService.waitForMetadataCompleteness(opStartTs)
-                .thenApply(unused -> {
-                    schemaCompatValidator.failIfTableDoesNotExistAt(opStartTs, 
tableId());
+                .thenRun(() -> 
schemaCompatValidator.failIfTableDoesNotExistAt(opStartTs, tableId()));
+    }
+
+    /**
+     * Makes sure that {@link 
SchemaVersionAwareReplicaRequest#schemaVersion()} sent in a request matches 
table schema version
+     * corresponding to the operation.
+     *
+     * @param request Replica request corresponding to the operation.
+     * @param opTsIfDirectRo Operation timestamp for a direct RO, {@code null} 
otherwise.
+     * @return Future completed when the validation is finished.
+     */
+    private CompletableFuture<Void> validateSchemaMatch(ReplicaRequest 
request, @Nullable HybridTimestamp opTsIfDirectRo) {
+        if (!(request instanceof SchemaVersionAwareReplicaRequest)) {
+            return completedFuture(null);
+        }
+
+        SchemaVersionAwareReplicaRequest versionAwareRequest = 
(SchemaVersionAwareReplicaRequest) request;
+
+        HybridTimestamp tsToWaitForSchema = getTxStartTimestamp(request);

Review Comment:
   as you wish



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to