This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 5d195d48d9 IGNITE-19837 Java client: Retry outdated schema error 
(#2381)
5d195d48d9 is described below

commit 5d195d48d9c2a294526a5d5c5b2a63ea79b0dae3
Author: Pavel Tupitsyn <ptupit...@apache.org>
AuthorDate: Mon Jul 31 18:43:49 2023 +0300

    IGNITE-19837 Java client: Retry outdated schema error (#2381)
    
    * Handle `SCHEMA_VERSION_MISMATCH_ERR` error code in Java client as a 
special case
      * Rethrow as internal `ClientSchemaVersionMismatchException`
      * Catch on Table level, retry operations with specified schema
    * Fix `SchemaVersionMismatchException` detection in 
`ClientInboundMessageHandler`
    
    Additionally, fix IGNITE-20103 - a separate fix would be difficult to test. 
When schema changes (due to ALTER TABLE), old rows are wrapped into 
`UpgradingRowAdapter`, which was not expected in `ClientTableCommon.writeTuple`:
    * Fix `UpgradingRowAdapter` to avoid returning incorrect `BinaryTuple`
    * Handle null `BinaryTuple` in `ClientTableCommon.writeTuple`
---
 .../org/apache/ignite/lang/IgniteException.java    |   2 +-
 .../client/proto/ClientBinaryTupleUtils.java       |  95 +++++++++++++++++++
 .../handler/ClientInboundMessageHandler.java       |  19 +++-
 .../handler/requests/table/ClientTableCommon.java  |  25 ++++-
 .../ClientSchemaVersionMismatchException.java      |  57 ++++++++++++
 .../ignite/internal/client/TcpClientChannel.java   |  27 +++++-
 .../ignite/internal/client/table/ClientTable.java  | 103 +++++++++++++++++++--
 .../client/table/ClientTupleSerializer.java        |  94 +------------------
 .../ignite/internal/util/ExceptionUtils.java       |   2 +-
 .../ignite/lang/IgniteInternalException.java       |   2 +-
 .../ItThinClientSchemaSynchronizationTest.java     |  66 ++++++++++---
 .../streamer/ItAbstractDataStreamerTest.java       |  52 +++++++++--
 .../schema/registry/UpgradingRowAdapter.java       |   8 ++
 13 files changed, 417 insertions(+), 135 deletions(-)

diff --git 
a/modules/api/src/main/java/org/apache/ignite/lang/IgniteException.java 
b/modules/api/src/main/java/org/apache/ignite/lang/IgniteException.java
index a6120acbe7..1a31df9d6d 100644
--- a/modules/api/src/main/java/org/apache/ignite/lang/IgniteException.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/IgniteException.java
@@ -181,7 +181,7 @@ public class IgniteException extends RuntimeException 
implements TraceableExcept
      * @param message Detailed message.
      * @param cause Optional nested exception (can be {@code null}).
      */
-    public IgniteException(UUID traceId, int code, String message, Throwable 
cause) {
+    public IgniteException(UUID traceId, int code, String message, @Nullable 
Throwable cause) {
         super(message, cause);
 
         this.traceId = traceId;
diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientBinaryTupleUtils.java
 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientBinaryTupleUtils.java
index 9d464eb4c4..d3eaf81bc5 100644
--- 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientBinaryTupleUtils.java
+++ 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientBinaryTupleUtils.java
@@ -33,6 +33,7 @@ import 
org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
 import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.sql.ColumnType;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Client binary tuple utils.
@@ -192,6 +193,100 @@ public class ClientBinaryTupleUtils {
         }
     }
 
+    /**
+     * Writes a column value to the binary tuple.
+     *
+     * @param builder Builder.
+     * @param type Column type.
+     * @param name Column name.
+     * @param scale Scale.
+     * @param v Value.
+     */
+    public static void appendValue(BinaryTupleBuilder builder, ColumnType 
type, String name, int scale, @Nullable Object v) {
+        if (v == null) {
+            builder.appendNull();
+            return;
+        }
+
+        try {
+            switch (type) {
+                case BOOLEAN:
+                    builder.appendBoolean((boolean) v);
+                    return;
+
+                case INT8:
+                    builder.appendByte((byte) v);
+                    return;
+
+                case INT16:
+                    builder.appendShort((short) v);
+                    return;
+
+                case INT32:
+                    builder.appendInt((int) v);
+                    return;
+
+                case INT64:
+                    builder.appendLong((long) v);
+                    return;
+
+                case FLOAT:
+                    builder.appendFloat((float) v);
+                    return;
+
+                case DOUBLE:
+                    builder.appendDouble((double) v);
+                    return;
+
+                case DECIMAL:
+                    builder.appendDecimalNotNull((BigDecimal) v, scale);
+                    return;
+
+                case UUID:
+                    builder.appendUuidNotNull((UUID) v);
+                    return;
+
+                case STRING:
+                    builder.appendStringNotNull((String) v);
+                    return;
+
+                case BYTE_ARRAY:
+                    builder.appendBytesNotNull((byte[]) v);
+                    return;
+
+                case BITMASK:
+                    builder.appendBitmaskNotNull((BitSet) v);
+                    return;
+
+                case DATE:
+                    builder.appendDateNotNull((LocalDate) v);
+                    return;
+
+                case TIME:
+                    builder.appendTimeNotNull((LocalTime) v);
+                    return;
+
+                case DATETIME:
+                    builder.appendDateTimeNotNull((LocalDateTime) v);
+                    return;
+
+                case TIMESTAMP:
+                    builder.appendTimestampNotNull((Instant) v);
+                    return;
+
+                case NUMBER:
+                    builder.appendNumberNotNull((BigInteger) v);
+                    return;
+
+                default:
+                    throw new IllegalArgumentException("Unsupported type: " + 
type);
+            }
+        } catch (ClassCastException e) {
+            throw new IgniteException(PROTOCOL_ERR, "Incorrect value type for 
column '" + name + "': " + e.getMessage(), e);
+        }
+    }
+
+
     private static void appendTypeAndScale(BinaryTupleBuilder builder, 
ColumnType type, int scale) {
         builder.appendInt(type.ordinal());
         builder.appendInt(scale);
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index f41ebf8e8c..ef195a296d 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -396,7 +396,8 @@ public class ClientInboundMessageHandler extends 
ChannelInboundHandlerAdapter im
     }
 
     private void writeErrorCore(Throwable err, ClientMessagePacker packer) {
-        err = ExceptionUtils.unwrapCause(err);
+        SchemaVersionMismatchException schemaVersionMismatchException = 
schemaVersionMismatchException(err);
+        err = schemaVersionMismatchException == null ? 
ExceptionUtils.unwrapCause(err) : schemaVersionMismatchException;
 
         // Trace ID and error code.
         if (err instanceof TraceableException) {
@@ -420,10 +421,10 @@ public class ClientInboundMessageHandler extends 
ChannelInboundHandlerAdapter im
         }
 
         // Extensions.
-        if (err instanceof SchemaVersionMismatchException) {
+        if (schemaVersionMismatchException != null) {
             packer.packMapHeader(1);
             packer.packString(ErrorExtensions.EXPECTED_SCHEMA_VERSION);
-            packer.packInt(((SchemaVersionMismatchException) 
err).expectedVersion());
+            packer.packInt(schemaVersionMismatchException.expectedVersion());
         } else {
             packer.packNil(); // No extensions.
         }
@@ -711,4 +712,16 @@ public class ClientInboundMessageHandler extends 
ChannelInboundHandlerAdapter im
             throw new IllegalArgumentException("Unsupported extension type: " 
+ type.getName());
         }
     }
+
+    private static @Nullable SchemaVersionMismatchException 
schemaVersionMismatchException(Throwable e) {
+        while (e != null) {
+            if (e instanceof SchemaVersionMismatchException) {
+                return (SchemaVersionMismatchException) e;
+            }
+
+            e = e.getCause();
+        }
+
+        return null;
+    }
 }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
index d04e4e1ba7..0c05718307 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.client.handler.requests.table;
 
+import static 
org.apache.ignite.internal.client.proto.ClientMessageCommon.NO_VALUE;
 import static org.apache.ignite.lang.ErrorGroups.Client.PROTOCOL_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Client.TABLE_ID_NOT_FOUND_ERR;
 
@@ -24,8 +25,10 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.client.handler.ClientResourceRegistry;
+import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
 import org.apache.ignite.internal.binarytuple.BinaryTupleContainer;
 import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
+import org.apache.ignite.internal.client.proto.ClientBinaryTupleUtils;
 import org.apache.ignite.internal.client.proto.ClientMessagePacker;
 import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
 import org.apache.ignite.internal.client.proto.TuplePart;
@@ -132,13 +135,27 @@ public class ClientTableCommon {
 
         assert tuple instanceof BinaryTupleContainer : "Tuple must be a 
BinaryTupleContainer: " + tuple.getClass();
         BinaryTupleReader binaryTuple = ((BinaryTupleContainer) 
tuple).binaryTuple();
-        assert binaryTuple != null : "Binary tuple must not be null: " + 
tuple.getClass();
 
         int elementCount = part == TuplePart.KEY ? 
schema.keyColumns().length() : schema.length();
-        assert elementCount == binaryTuple.elementCount() :
-                "Tuple element count mismatch: " + elementCount + " != " + 
binaryTuple.elementCount();
 
-        packer.packBinaryTuple(binaryTuple);
+        if (binaryTuple != null) {
+            assert elementCount == binaryTuple.elementCount() :
+                    "Tuple element count mismatch: " + elementCount + " != " + 
binaryTuple.elementCount() + " (" + tuple.getClass() + ")";
+
+            packer.packBinaryTuple(binaryTuple);
+        } else {
+            // Underlying binary tuple is not available or can't be used as 
is, pack columns one by one.
+            var builder = new BinaryTupleBuilder(elementCount);
+
+            for (var i = 0; i < elementCount; i++) {
+                var col = schema.column(i);
+                Object v = tuple.valueOrDefault(col.name(), NO_VALUE);
+
+                ClientBinaryTupleUtils.appendValue(builder, 
getColumnType(col.type().spec()), col.name(), getDecimalScale(col.type()), v);
+            }
+
+            packer.packBinaryTuple(builder);
+        }
     }
 
     /**
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientSchemaVersionMismatchException.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientSchemaVersionMismatchException.java
new file mode 100644
index 0000000000..ff9bb0ec7f
--- /dev/null
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientSchemaVersionMismatchException.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client;
+
+import java.util.UUID;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Indicates incompatible schema version.
+ */
+public class ClientSchemaVersionMismatchException extends 
IgniteInternalException {
+    /** Serial version uid. */
+    private static final long serialVersionUID = 0L;
+
+    /** Expected schema version. */
+    private final int expectedVersion;
+
+    /**
+     * Constructor.
+     *
+     * @param traceId Trace ID.
+     * @param code Error code.
+     * @param message String message.
+     * @param expectedVersion Expected schema version.
+     * @param cause Cause.
+     */
+    ClientSchemaVersionMismatchException(UUID traceId, int code, @Nullable 
String message, int expectedVersion, @Nullable Throwable cause) {
+        super(traceId, code, message, cause);
+
+        this.expectedVersion = expectedVersion;
+    }
+
+    /**
+     * Gets expected schema version.
+     *
+     * @return Expected schema version.
+     */
+    public int expectedVersion() {
+        return expectedVersion;
+    }
+}
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index 599e888332..61f72bbebb 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -51,12 +51,14 @@ import 
org.apache.ignite.internal.client.proto.ClientMessageCommon;
 import org.apache.ignite.internal.client.proto.ClientMessagePacker;
 import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
 import org.apache.ignite.internal.client.proto.ClientOp;
+import org.apache.ignite.internal.client.proto.ErrorExtensions;
 import org.apache.ignite.internal.client.proto.HandshakeExtension;
 import org.apache.ignite.internal.client.proto.ProtocolVersion;
 import org.apache.ignite.internal.client.proto.ResponseFlags;
 import org.apache.ignite.internal.client.proto.ServerMessageType;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.lang.ErrorGroups.Table;
 import org.apache.ignite.lang.IgniteCheckedException;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.IgniteExceptionUtils;
@@ -427,13 +429,34 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
     private static <T extends Throwable> T readError(ClientMessageUnpacker 
unpacker) {
         var traceId = unpacker.unpackUuid();
         var code = unpacker.unpackInt();
+
         var errClassName = unpacker.unpackString();
         var errMsg = unpacker.tryUnpackNil() ? null : unpacker.unpackString();
 
         IgniteException causeWithStackTrace = unpacker.tryUnpackNil() ? null : 
new IgniteException(traceId, code, unpacker.unpackString());
 
-        // TODO IGNITE-19837 Retry outdated schema error
-        unpacker.skipValues(1); // Error extensions.
+        if (code == Table.SCHEMA_VERSION_MISMATCH_ERR) {
+            int extSize = unpacker.tryUnpackNil() ? 0 : 
unpacker.unpackMapHeader();
+            int expectedSchemaVersion = -1;
+
+            for (int i = 0; i < extSize; i++) {
+                String key = unpacker.unpackString();
+
+                if (key.equals(ErrorExtensions.EXPECTED_SCHEMA_VERSION)) {
+                    expectedSchemaVersion = unpacker.unpackInt();
+                } else {
+                    // Unknown extension - ignore.
+                    unpacker.skipValues(1);
+                }
+            }
+
+            if (expectedSchemaVersion == -1) {
+                return (T) new IgniteException(
+                        traceId, PROTOCOL_ERR, "Expected schema version is not 
specified in error extension map.", causeWithStackTrace);
+            }
+
+            return (T) new ClientSchemaVersionMismatchException(traceId, code, 
errMsg, expectedSchemaVersion, causeWithStackTrace);
+        }
 
         try {
             // TODO https://issues.apache.org/jira/browse/IGNITE-19539
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
index e71e6cf786..b31328a824 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
@@ -30,6 +30,7 @@ import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 import org.apache.ignite.client.RetryPolicy;
+import org.apache.ignite.internal.client.ClientSchemaVersionMismatchException;
 import org.apache.ignite.internal.client.ClientUtils;
 import org.apache.ignite.internal.client.PayloadOutputChannel;
 import org.apache.ignite.internal.client.ReliableChannel;
@@ -79,8 +80,8 @@ public class ClientTable implements Table {
     /**
      * Constructor.
      *
-     * @param ch   Channel.
-     * @param id   Table id.
+     * @param ch Channel.
+     * @param id Table id.
      * @param name Table name.
      */
     public ClientTable(ReliableChannel ch, int id, String name) {
@@ -259,7 +260,7 @@ public class ClientTable implements Table {
     /**
      * Writes transaction, if present.
      *
-     * @param tx  Transaction.
+     * @param tx Transaction.
      * @param out Packer.
      */
     public static void writeTx(@Nullable Transaction tx, PayloadOutputChannel 
out) {
@@ -284,12 +285,25 @@ public class ClientTable implements Table {
             @Nullable T defaultValue,
             @Nullable PartitionAwarenessProvider provider
     ) {
-        CompletableFuture<ClientSchema> schemaFut = getLatestSchema();
+        return doSchemaOutInOpAsync(opCode, writer, reader, defaultValue, 
provider, null);
+    }
+
+    private <T> CompletableFuture<T> doSchemaOutInOpAsync(
+            int opCode,
+            BiConsumer<ClientSchema, PayloadOutputChannel> writer,
+            BiFunction<ClientSchema, ClientMessageUnpacker, T> reader,
+            @Nullable T defaultValue,
+            @Nullable PartitionAwarenessProvider provider,
+            @Nullable Integer schemaVersionOverride
+    ) {
+        CompletableFuture<T> fut = new CompletableFuture<>();
+
+        CompletableFuture<ClientSchema> schemaFut = 
getSchema(schemaVersionOverride == null ? latestSchemaVer : 
schemaVersionOverride);
         CompletableFuture<List<String>> partitionsFut = provider == null || 
!provider.isPartitionAwarenessEnabled()
                 ? CompletableFuture.completedFuture(null)
                 : getPartitionAssignment();
 
-        return CompletableFuture.allOf(schemaFut, partitionsFut)
+        CompletableFuture.allOf(schemaFut, partitionsFut)
                 .thenCompose(v -> {
                     ClientSchema schema = schemaFut.getNow(null);
                     String preferredNodeName = getPreferredNodeName(provider, 
partitionsFut.getNow(null), schema);
@@ -300,7 +314,30 @@ public class ClientTable implements Table {
                             preferredNodeName,
                             null);
                 })
-                .thenCompose(t -> loadSchemaAndReadData(t, reader));
+                .thenCompose(t -> loadSchemaAndReadData(t, reader))
+                .whenComplete((res, err) -> {
+                    if (err != null) {
+                        if (err.getCause() instanceof 
ClientSchemaVersionMismatchException) {
+                            // Retry with specific schema version.
+                            int expectedVersion = 
((ClientSchemaVersionMismatchException) err.getCause()).expectedVersion();
+
+                            doSchemaOutInOpAsync(opCode, writer, reader, 
defaultValue, provider, expectedVersion)
+                                    .whenComplete((res0, err0) -> {
+                                        if (err0 != null) {
+                                            fut.completeExceptionally(err0);
+                                        } else {
+                                            fut.complete(res0);
+                                        }
+                                    });
+                        } else {
+                            fut.completeExceptionally(err);
+                        }
+                    } else {
+                        fut.complete(res);
+                    }
+                });
+
+        return fut;
     }
 
     /**
@@ -318,7 +355,7 @@ public class ClientTable implements Table {
             BiConsumer<ClientSchema, PayloadOutputChannel> writer,
             Function<ClientMessageUnpacker, T> reader,
             @Nullable PartitionAwarenessProvider provider) {
-        return doSchemaOutOpAsync(opCode, writer, reader, provider, null);
+        return doSchemaOutOpAsync(opCode, writer, reader, provider, null, 
null);
     }
 
     /**
@@ -331,19 +368,42 @@ public class ClientTable implements Table {
      * @param <T> Result type.
      * @return Future representing pending completion of the operation.
      */
-    public <T> CompletableFuture<T> doSchemaOutOpAsync(
+    <T> CompletableFuture<T> doSchemaOutOpAsync(
             int opCode,
             BiConsumer<ClientSchema, PayloadOutputChannel> writer,
             Function<ClientMessageUnpacker, T> reader,
             @Nullable PartitionAwarenessProvider provider,
             @Nullable RetryPolicy retryPolicyOverride) {
+        return doSchemaOutOpAsync(opCode, writer, reader, provider, 
retryPolicyOverride, null);
+    }
+
+    /**
+     * Performs a schema-based operation.
+     *
+     * @param opCode Op code.
+     * @param writer Writer.
+     * @param reader Reader.
+     * @param provider Partition awareness provider.
+     * @param retryPolicyOverride Retry policy override.
+     * @param schemaVersionOverride Schema version override.
+     * @param <T> Result type.
+     * @return Future representing pending completion of the operation.
+     */
+    private <T> CompletableFuture<T> doSchemaOutOpAsync(
+            int opCode,
+            BiConsumer<ClientSchema, PayloadOutputChannel> writer,
+            Function<ClientMessageUnpacker, T> reader,
+            @Nullable PartitionAwarenessProvider provider,
+            @Nullable RetryPolicy retryPolicyOverride,
+            @Nullable Integer schemaVersionOverride) {
+        CompletableFuture<T> fut = new CompletableFuture<>();
 
-        CompletableFuture<ClientSchema> schemaFut = getLatestSchema();
+        CompletableFuture<ClientSchema> schemaFut = 
getSchema(schemaVersionOverride == null ? latestSchemaVer : 
schemaVersionOverride);
         CompletableFuture<List<String>> partitionsFut = provider == null || 
!provider.isPartitionAwarenessEnabled()
                 ? CompletableFuture.completedFuture(null)
                 : getPartitionAssignment();
 
-        return CompletableFuture.allOf(schemaFut, partitionsFut)
+        CompletableFuture.allOf(schemaFut, partitionsFut)
                 .thenCompose(v -> {
                     ClientSchema schema = schemaFut.getNow(null);
                     String preferredNodeName = getPreferredNodeName(provider, 
partitionsFut.getNow(null), schema);
@@ -357,7 +417,30 @@ public class ClientTable implements Table {
                             },
                             preferredNodeName,
                             retryPolicyOverride);
+                })
+                .whenComplete((res, err) -> {
+                    if (err != null) {
+                        if (err.getCause() instanceof 
ClientSchemaVersionMismatchException) {
+                            // Retry with specific schema version.
+                            int expectedVersion = 
((ClientSchemaVersionMismatchException) err.getCause()).expectedVersion();
+
+                            doSchemaOutOpAsync(opCode, writer, reader, 
provider, retryPolicyOverride, expectedVersion)
+                                    .whenComplete((res0, err0) -> {
+                                        if (err0 != null) {
+                                            fut.completeExceptionally(err0);
+                                        } else {
+                                            fut.complete(res0);
+                                        }
+                                    });
+                        } else {
+                            fut.completeExceptionally(err);
+                        }
+                    } else {
+                        fut.complete(res);
+                    }
                 });
+
+        return fut;
     }
 
     private <T> @Nullable Object readSchemaAndReadData(
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTupleSerializer.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTupleSerializer.java
index 23341d96f6..f8199d31b3 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTupleSerializer.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTupleSerializer.java
@@ -19,14 +19,7 @@ package org.apache.ignite.internal.client.table;
 
 import static 
org.apache.ignite.internal.client.proto.ClientMessageCommon.NO_VALUE;
 import static org.apache.ignite.internal.client.table.ClientTable.writeTx;
-import static org.apache.ignite.lang.ErrorGroups.Client.PROTOCOL_ERR;
-
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.time.Instant;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.LocalTime;
+
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Collection;
@@ -34,16 +27,15 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.UUID;
 import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
 import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
 import org.apache.ignite.internal.client.PayloadOutputChannel;
+import org.apache.ignite.internal.client.proto.ClientBinaryTupleUtils;
 import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
 import org.apache.ignite.internal.client.proto.TuplePart;
 import org.apache.ignite.internal.client.tx.ClientTransaction;
 import org.apache.ignite.internal.util.HashCalculator;
 import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.table.Tuple;
 import org.apache.ignite.table.mapper.Mapper;
 import org.apache.ignite.tx.Transaction;
@@ -316,93 +308,13 @@ public class ClientTupleSerializer {
     }
 
     private static void appendValue(BinaryTupleBuilder builder, BitSet 
noValueSet, ClientColumn col, Object v) {
-        if (v == null) {
-            builder.appendNull();
-            return;
-        }
-
         if (v == NO_VALUE) {
             noValueSet.set(col.schemaIndex());
             builder.appendNull();
             return;
         }
 
-        try {
-            switch (col.type()) {
-                case BOOLEAN:
-                    builder.appendBoolean((boolean) v);
-                    return;
-
-                case INT8:
-                    builder.appendByte((byte) v);
-                    return;
-
-                case INT16:
-                    builder.appendShort((short) v);
-                    return;
-
-                case INT32:
-                    builder.appendInt((int) v);
-                    return;
-
-                case INT64:
-                    builder.appendLong((long) v);
-                    return;
-
-                case FLOAT:
-                    builder.appendFloat((float) v);
-                    return;
-
-                case DOUBLE:
-                    builder.appendDouble((double) v);
-                    return;
-
-                case DECIMAL:
-                    builder.appendDecimalNotNull((BigDecimal) v, col.scale());
-                    return;
-
-                case UUID:
-                    builder.appendUuidNotNull((UUID) v);
-                    return;
-
-                case STRING:
-                    builder.appendStringNotNull((String) v);
-                    return;
-
-                case BYTE_ARRAY:
-                    builder.appendBytesNotNull((byte[]) v);
-                    return;
-
-                case BITMASK:
-                    builder.appendBitmaskNotNull((BitSet) v);
-                    return;
-
-                case DATE:
-                    builder.appendDateNotNull((LocalDate) v);
-                    return;
-
-                case TIME:
-                    builder.appendTimeNotNull((LocalTime) v);
-                    return;
-
-                case DATETIME:
-                    builder.appendDateTimeNotNull((LocalDateTime) v);
-                    return;
-
-                case TIMESTAMP:
-                    builder.appendTimestampNotNull((Instant) v);
-                    return;
-
-                case NUMBER:
-                    builder.appendNumberNotNull((BigInteger) v);
-                    return;
-
-                default:
-                    throw new IllegalArgumentException("Unsupported type: " + 
col.type());
-            }
-        } catch (ClassCastException e) {
-            throw new IgniteException(PROTOCOL_ERR, "Incorrect value type for 
column '" + col.name() + "': " + e.getMessage(), e);
-        }
+        ClientBinaryTupleUtils.appendValue(builder, col.type(), col.name(), 
col.scale(), v);
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
index 3c6f88f80e..c722f505d7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
@@ -449,7 +449,7 @@ public final class ExceptionUtils {
      * @param t Throwable to extract a trace identifier.
      * @return Returns trace identifier.
      */
-    public static UUID getOrCreateTraceId(Throwable t) {
+    public static UUID getOrCreateTraceId(@Nullable Throwable t) {
         Throwable e = t;
 
         // This collection is used to avoid infinite loops in case of cyclic 
dependencies.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/lang/IgniteInternalException.java
 
b/modules/core/src/main/java/org/apache/ignite/lang/IgniteInternalException.java
index 974b3ff2a1..73f5b04b6d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/lang/IgniteInternalException.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/lang/IgniteInternalException.java
@@ -142,7 +142,7 @@ public class IgniteInternalException extends 
RuntimeException implements Traceab
      * @param message Detail message.
      * @param cause Optional nested exception (can be {@code null}).
      */
-    public IgniteInternalException(UUID traceId, int code, String message, 
@Nullable Throwable cause) {
+    public IgniteInternalException(UUID traceId, int code, @Nullable String 
message, @Nullable Throwable cause) {
         super(message, cause);
 
         this.traceId = traceId;
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSchemaSynchronizationTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSchemaSynchronizationTest.java
index 1be0a64acd..e50b50e944 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSchemaSynchronizationTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSchemaSynchronizationTest.java
@@ -17,12 +17,10 @@
 
 package org.apache.ignite.internal.runner.app.client;
 
-import static org.hamcrest.CoreMatchers.containsString;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 
 import org.apache.ignite.client.IgniteClient;
-import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.sql.Session;
 import org.apache.ignite.table.RecordView;
 import org.apache.ignite.table.Tuple;
@@ -31,15 +29,15 @@ import org.junit.jupiter.api.Test;
 /**
  * Tests for client schema synchronization.
  */
+@SuppressWarnings("resource")
 public class ItThinClientSchemaSynchronizationTest extends 
ItAbstractThinClientTest {
-    @SuppressWarnings("resource")
     @Test
-    void testOutdatedSchemaFromClientThrowsExceptionOnServer() throws 
InterruptedException {
+    void testClientUsesLatestSchemaOnWrite() throws InterruptedException {
         IgniteClient client = client();
         Session ses = client.sql().createSession();
 
         // Create table, insert data.
-        String tableName = 
"testOutdatedSchemaFromClientThrowsExceptionOnServer";
+        String tableName = "testClientUsesLatestSchemaOnWrite";
         ses.execute(null, "CREATE TABLE " + tableName + "(ID INT NOT NULL 
PRIMARY KEY)");
 
         waitForTableOnAllNodes(tableName);
@@ -48,11 +46,55 @@ public class ItThinClientSchemaSynchronizationTest extends 
ItAbstractThinClientT
         Tuple rec = Tuple.create().set("ID", 1);
         recordView.insert(null, rec);
 
-        // Modify table, get data - client will use old schema.
-        ses.execute(null, "ALTER TABLE 
testOutdatedSchemaFromClientThrowsExceptionOnServer ADD COLUMN NAME VARCHAR");
+        // Modify table, insert data - client will use old schema, receive 
error, retry with new schema.
+        // The process is transparent for the user: updated schema is in 
effect immediately.
+        ses.execute(null, "ALTER TABLE " + tableName + " ADD COLUMN NAME 
VARCHAR NOT NULL");
 
-        // TODO IGNITE-19837 Retry outdated schema error
-        IgniteException ex = assertThrows(IgniteException.class, () -> 
recordView.insert(null, rec));
-        assertThat(ex.getMessage(), containsString("Schema version mismatch 
[expectedVer=2, actualVer=1]"));
+        Tuple rec2 = Tuple.create().set("ID", 1).set("NAME", "name");
+        recordView.upsert(null, rec2);
+
+        assertEquals("name", recordView.get(null, rec).stringValue(1));
+    }
+
+    @Test
+    void testClientUsesLatestSchemaOnRead() throws InterruptedException {
+        IgniteClient client = client();
+        Session ses = client.sql().createSession();
+
+        // Create table, insert data.
+        String tableName = "testClientUsesLatestSchemaOnRead";
+        ses.execute(null, "CREATE TABLE " + tableName + "(ID INT NOT NULL 
PRIMARY KEY)");
+
+        waitForTableOnAllNodes(tableName);
+        RecordView<Tuple> recordView = 
client.tables().table(tableName).recordView();
+
+        Tuple rec = Tuple.create().set("ID", 1);
+        recordView.insert(null, rec);
+
+        // Modify table, insert data - client will use old schema, receive 
error, retry with new schema.
+        // The process is transparent for the user: updated schema is in 
effect immediately.
+        ses.execute(null, "ALTER TABLE " + tableName + " ADD COLUMN NAME 
VARCHAR DEFAULT 'def_name'");
+        assertEquals("def_name", recordView.get(null, rec).stringValue(1));
+    }
+
+    @Test
+    void testClientUsesLatestSchemaOnReadWithNotNullColumn() throws 
InterruptedException {
+        IgniteClient client = client();
+        Session ses = client.sql().createSession();
+
+        // Create table, insert data.
+        String tableName = "testClientUsesLatestSchemaOnReadWithNotNullColumn";
+        ses.execute(null, "CREATE TABLE " + tableName + "(ID INT NOT NULL 
PRIMARY KEY)");
+
+        waitForTableOnAllNodes(tableName);
+        RecordView<Tuple> recordView = 
client.tables().table(tableName).recordView();
+
+        Tuple rec = Tuple.create().set("ID", 1);
+        recordView.insert(null, rec);
+
+        // Modify table and get old row.
+        // It still has null value in the old column, even though it is not 
allowed by the new schema.
+        ses.execute(null, "ALTER TABLE " + tableName + " ADD COLUMN NAME 
VARCHAR NOT NULL");
+        assertNull(recordView.get(null, rec).stringValue(1));
     }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
index b4e67b24ea..dda034d88c 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
@@ -32,6 +32,7 @@ import java.util.concurrent.SubmissionPublisher;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.internal.sql.engine.ClusterPerClassIntegrationTest;
+import org.apache.ignite.sql.Session;
 import org.apache.ignite.table.DataStreamerOptions;
 import org.apache.ignite.table.KeyValueView;
 import org.apache.ignite.table.RecordView;
@@ -144,16 +145,7 @@ public abstract class ItAbstractDataStreamerTest extends 
ClusterPerClassIntegrat
             view.streamData(publisher, options);
 
             publisher.submit(tuple(1, "foo"));
-            assertTrue(waitForCondition(() -> {
-                @SuppressWarnings("resource")
-                var tx = ignite().transactions().begin(new 
TransactionOptions().readOnly(true));
-
-                try {
-                    return view.get(tx, tupleKey(1)) != null;
-                } finally {
-                    tx.rollback();
-                }
-            }, 50, 5000));
+            waitForKey(view, tupleKey(1));
         }
     }
 
@@ -213,6 +205,46 @@ public abstract class ItAbstractDataStreamerTest extends 
ClusterPerClassIntegrat
         assertNull(view.get(null, tupleKey(10_000)));
     }
 
+    @SuppressWarnings("resource")
+    @Test
+    public void testSchemaUpdateWhileStreaming() throws InterruptedException {
+        Session ses = ignite().sql().createSession();
+
+        String tableName = "testSchemaUpdateWhileStreaming";
+        ses.execute(null, "CREATE TABLE " + tableName + "(ID INT NOT NULL 
PRIMARY KEY)");
+        RecordView<Tuple> view = 
ignite().tables().table(tableName).recordView();
+
+        CompletableFuture<Void> streamerFut;
+
+        try (var publisher = new SubmissionPublisher<Tuple>()) {
+            var options = DataStreamerOptions.builder().batchSize(1).build();
+            streamerFut = view.streamData(publisher, options);
+
+            publisher.submit(tupleKey(1));
+            waitForKey(view, tupleKey(1));
+
+            ses.execute(null, "ALTER TABLE " + tableName + " ADD COLUMN NAME 
VARCHAR NOT NULL");
+            publisher.submit(tuple(2, "bar"));
+        }
+
+        streamerFut.orTimeout(1, TimeUnit.SECONDS).join();
+
+        assertEquals("bar", view.get(null, tupleKey(2)).stringValue("name"));
+    }
+
+    private void waitForKey(RecordView<Tuple> view, Tuple key) throws 
InterruptedException {
+        assertTrue(waitForCondition(() -> {
+            @SuppressWarnings("resource")
+            var tx = ignite().transactions().begin(new 
TransactionOptions().readOnly(true));
+
+            try {
+                return view.get(tx, key) != null;
+            } finally {
+                tx.rollback();
+            }
+        }, 50, 5000));
+    }
+
     private Table defaultTable() {
         //noinspection resource
         return ignite().tables().table(TABLE_NAME);
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java
index dff0f63f1a..51924510a0 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java
@@ -26,6 +26,7 @@ import java.time.LocalTime;
 import java.util.BitSet;
 import java.util.UUID;
 import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.BinaryTupleSchema;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.InvalidTypeException;
@@ -418,4 +419,11 @@ class UpgradingRowAdapter extends Row {
 
         return mappedId < 0 ? (Instant) column.defaultValue() : 
super.timestampValue(mappedId);
     }
+
+    /** {@inheritDoc} */
+    @Override
+    public BinaryTuple binaryTuple() {
+        // Underlying binary tuple can not be used directly.
+        return null;
+    }
 }


Reply via email to