sashapolo commented on code in PR #2761:
URL: https://github.com/apache/ignite-3/pull/2761#discussion_r1375858483
##########
modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java:
##########
@@ -95,17 +101,40 @@ protected final <T> T sync(CompletableFuture<T> fut) {
* @return Whatever the action returns.
*/
protected final <T> CompletableFuture<T> withSchemaSync(@Nullable
Transaction tx, KvAction<T> action) {
- // TODO: IGNITE-20106 - retry if our request is rejected by the server
due to a changed schema version.
+ return withSchemaSync(tx, null, action);
+ }
+ private <T> CompletableFuture<T> withSchemaSync(@Nullable Transaction tx,
@Nullable Integer previousSchemaVersion, KvAction<T> action) {
CompletableFuture<Integer> schemaVersionFuture = tx == null
? schemaVersions.schemaVersionAtNow(tbl.tableId())
: schemaVersions.schemaVersionAt(((InternalTransaction)
tx).startTimestamp(), tbl.tableId());
- CompletableFuture<T> future =
schemaVersionFuture.thenCompose(action::act);
+ CompletableFuture<T> future = schemaVersionFuture
+ .thenCompose(schemaVersion -> {
+ return action.act(schemaVersion)
+ .handle((BiFunction<T, Throwable,
CompletableFuture<T>>) (res, ex) -> {
Review Comment:
In my proposal I also provided a way to get rid of the cast to `BiFunction`,
shall we use it?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -472,34 +472,30 @@ private CompletableFuture<?>
processRequest(ReplicaRequest request, @Nullable Bo
}
}
- return waitForSchemasBeforeReading(request)
- .thenCompose(unused -> validateTableExistence(request))
- .thenCompose(opStartTimestamp ->
processOperationRequest(request, isPrimary, senderId, opStartTimestamp));
+ HybridTimestamp opTsIfDirectRo = (request instanceof
ReadOnlyDirectReplicaRequest) ? hybridClock.now() : null;
+
+ return validateTableExistence(request, opTsIfDirectRo)
+ .thenCompose(unused -> validateSchemaMatch(request,
opTsIfDirectRo))
+ .thenCompose(unused -> waitForSchemasBeforeReading(request,
opTsIfDirectRo))
+ .thenCompose(opStartTimestamp ->
processOperationRequest(request, isPrimary, senderId, opTsIfDirectRo));
}
/**
- * Makes sure that we have schemas corresponding to the moment of tx
start; this makes PK extraction safe WRT
- * {@link org.apache.ignite.internal.schema.SchemaRegistry#schema(int)}.
+ * Validates that the table exists at a timestamp corresponding to the
request operation.
*
- * @param request Request that's being processed.
+ * <ul>
+ * <li>For an RW read/write, it's 'now'</li>
+ * <li>For an RO read (with readTimestamp), it's readTimestamp
(matches readTimestamp in the transaction)</li>
+ * <li>For an RO direct read, it's the timestamp chosen (as 'now') to
process the request</li>
Review Comment:
Please specify that it is an "RO transaction", like you did in other places
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -520,18 +518,84 @@ private CompletableFuture<HybridTimestamp>
validateTableExistence(ReplicaRequest
}
return schemaSyncService.waitForMetadataCompleteness(opStartTs)
- .thenApply(unused -> {
- schemaCompatValidator.failIfTableDoesNotExistAt(opStartTs,
tableId());
+ .thenRun(() ->
schemaCompatValidator.failIfTableDoesNotExistAt(opStartTs, tableId()));
+ }
+
+ /**
+ * Makes sure that {@link
SchemaVersionAwareReplicaRequest#schemaVersion()} sent in a request matches
table schema version
+ * corresponding to the operation.
+ *
+ * @param request Replica request corresponding to the operation.
+ * @param opTsIfDirectRo Operation timestamp for a direct RO, {@code null}
otherwise.
+ * @return Future completed when the validation is finished.
+ */
+ private CompletableFuture<Void> validateSchemaMatch(ReplicaRequest
request, @Nullable HybridTimestamp opTsIfDirectRo) {
+ if (!(request instanceof SchemaVersionAwareReplicaRequest)) {
+ return completedFuture(null);
+ }
+
+ SchemaVersionAwareReplicaRequest versionAwareRequest =
(SchemaVersionAwareReplicaRequest) request;
+
+ HybridTimestamp tsToWaitForSchema = getTxStartTimestamp(request);
Review Comment:
as you wish
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]