This is an automated email from the ASF dual-hosted git repository. ptupitsyn pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 5d195d48d9 IGNITE-19837 Java client: Retry outdated schema error (#2381) 5d195d48d9 is described below commit 5d195d48d9c2a294526a5d5c5b2a63ea79b0dae3 Author: Pavel Tupitsyn <ptupit...@apache.org> AuthorDate: Mon Jul 31 18:43:49 2023 +0300 IGNITE-19837 Java client: Retry outdated schema error (#2381) * Handle `SCHEMA_VERSION_MISMATCH_ERR` error code in Java client as a special case * Rethrow as internal `ClientSchemaVersionMismatchException` * Catch on Table level, retry operations with specified schema * Fix `SchemaVersionMismatchException` detection in `ClientInboundMessageHandler` Additionally, fix IGNITE-20103 - a separate fix would be difficult to test. When schema changes (due to ALTER TABLE), old rows are wrapped into `UpgradingRowAdapter`, which was not expected in `ClientTableCommon.writeTuple`: * Fix `UpgradingRowAdapter` to avoid returning incorrect `BinaryTuple` * Handle null `BinaryTuple` in `ClientTableCommon.writeTuple` --- .../org/apache/ignite/lang/IgniteException.java | 2 +- .../client/proto/ClientBinaryTupleUtils.java | 95 +++++++++++++++++++ .../handler/ClientInboundMessageHandler.java | 19 +++- .../handler/requests/table/ClientTableCommon.java | 25 ++++- .../ClientSchemaVersionMismatchException.java | 57 ++++++++++++ .../ignite/internal/client/TcpClientChannel.java | 27 +++++- .../ignite/internal/client/table/ClientTable.java | 103 +++++++++++++++++++-- .../client/table/ClientTupleSerializer.java | 94 +------------------ .../ignite/internal/util/ExceptionUtils.java | 2 +- .../ignite/lang/IgniteInternalException.java | 2 +- .../ItThinClientSchemaSynchronizationTest.java | 66 ++++++++++--- .../streamer/ItAbstractDataStreamerTest.java | 52 +++++++++-- .../schema/registry/UpgradingRowAdapter.java | 8 ++ 13 files changed, 417 insertions(+), 135 deletions(-) diff --git a/modules/api/src/main/java/org/apache/ignite/lang/IgniteException.java b/modules/api/src/main/java/org/apache/ignite/lang/IgniteException.java index a6120acbe7..1a31df9d6d 100644 --- a/modules/api/src/main/java/org/apache/ignite/lang/IgniteException.java +++ b/modules/api/src/main/java/org/apache/ignite/lang/IgniteException.java @@ -181,7 +181,7 @@ public class IgniteException extends RuntimeException implements TraceableExcept * @param message Detailed message. * @param cause Optional nested exception (can be {@code null}). */ - public IgniteException(UUID traceId, int code, String message, Throwable cause) { + public IgniteException(UUID traceId, int code, String message, @Nullable Throwable cause) { super(message, cause); this.traceId = traceId; diff --git a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientBinaryTupleUtils.java b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientBinaryTupleUtils.java index 9d464eb4c4..d3eaf81bc5 100644 --- a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientBinaryTupleUtils.java +++ b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientBinaryTupleUtils.java @@ -33,6 +33,7 @@ import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder; import org.apache.ignite.internal.binarytuple.BinaryTupleReader; import org.apache.ignite.lang.IgniteException; import org.apache.ignite.sql.ColumnType; +import org.jetbrains.annotations.Nullable; /** * Client binary tuple utils. @@ -192,6 +193,100 @@ public class ClientBinaryTupleUtils { } } + /** + * Writes a column value to the binary tuple. + * + * @param builder Builder. + * @param type Column type. + * @param name Column name. + * @param scale Scale. + * @param v Value. + */ + public static void appendValue(BinaryTupleBuilder builder, ColumnType type, String name, int scale, @Nullable Object v) { + if (v == null) { + builder.appendNull(); + return; + } + + try { + switch (type) { + case BOOLEAN: + builder.appendBoolean((boolean) v); + return; + + case INT8: + builder.appendByte((byte) v); + return; + + case INT16: + builder.appendShort((short) v); + return; + + case INT32: + builder.appendInt((int) v); + return; + + case INT64: + builder.appendLong((long) v); + return; + + case FLOAT: + builder.appendFloat((float) v); + return; + + case DOUBLE: + builder.appendDouble((double) v); + return; + + case DECIMAL: + builder.appendDecimalNotNull((BigDecimal) v, scale); + return; + + case UUID: + builder.appendUuidNotNull((UUID) v); + return; + + case STRING: + builder.appendStringNotNull((String) v); + return; + + case BYTE_ARRAY: + builder.appendBytesNotNull((byte[]) v); + return; + + case BITMASK: + builder.appendBitmaskNotNull((BitSet) v); + return; + + case DATE: + builder.appendDateNotNull((LocalDate) v); + return; + + case TIME: + builder.appendTimeNotNull((LocalTime) v); + return; + + case DATETIME: + builder.appendDateTimeNotNull((LocalDateTime) v); + return; + + case TIMESTAMP: + builder.appendTimestampNotNull((Instant) v); + return; + + case NUMBER: + builder.appendNumberNotNull((BigInteger) v); + return; + + default: + throw new IllegalArgumentException("Unsupported type: " + type); + } + } catch (ClassCastException e) { + throw new IgniteException(PROTOCOL_ERR, "Incorrect value type for column '" + name + "': " + e.getMessage(), e); + } + } + + private static void appendTypeAndScale(BinaryTupleBuilder builder, ColumnType type, int scale) { builder.appendInt(type.ordinal()); builder.appendInt(scale); diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java index f41ebf8e8c..ef195a296d 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java @@ -396,7 +396,8 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter im } private void writeErrorCore(Throwable err, ClientMessagePacker packer) { - err = ExceptionUtils.unwrapCause(err); + SchemaVersionMismatchException schemaVersionMismatchException = schemaVersionMismatchException(err); + err = schemaVersionMismatchException == null ? ExceptionUtils.unwrapCause(err) : schemaVersionMismatchException; // Trace ID and error code. if (err instanceof TraceableException) { @@ -420,10 +421,10 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter im } // Extensions. - if (err instanceof SchemaVersionMismatchException) { + if (schemaVersionMismatchException != null) { packer.packMapHeader(1); packer.packString(ErrorExtensions.EXPECTED_SCHEMA_VERSION); - packer.packInt(((SchemaVersionMismatchException) err).expectedVersion()); + packer.packInt(schemaVersionMismatchException.expectedVersion()); } else { packer.packNil(); // No extensions. } @@ -711,4 +712,16 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter im throw new IllegalArgumentException("Unsupported extension type: " + type.getName()); } } + + private static @Nullable SchemaVersionMismatchException schemaVersionMismatchException(Throwable e) { + while (e != null) { + if (e instanceof SchemaVersionMismatchException) { + return (SchemaVersionMismatchException) e; + } + + e = e.getCause(); + } + + return null; + } } diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java index d04e4e1ba7..0c05718307 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java @@ -17,6 +17,7 @@ package org.apache.ignite.client.handler.requests.table; +import static org.apache.ignite.internal.client.proto.ClientMessageCommon.NO_VALUE; import static org.apache.ignite.lang.ErrorGroups.Client.PROTOCOL_ERR; import static org.apache.ignite.lang.ErrorGroups.Client.TABLE_ID_NOT_FOUND_ERR; @@ -24,8 +25,10 @@ import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.CompletableFuture; import org.apache.ignite.client.handler.ClientResourceRegistry; +import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder; import org.apache.ignite.internal.binarytuple.BinaryTupleContainer; import org.apache.ignite.internal.binarytuple.BinaryTupleReader; +import org.apache.ignite.internal.client.proto.ClientBinaryTupleUtils; import org.apache.ignite.internal.client.proto.ClientMessagePacker; import org.apache.ignite.internal.client.proto.ClientMessageUnpacker; import org.apache.ignite.internal.client.proto.TuplePart; @@ -132,13 +135,27 @@ public class ClientTableCommon { assert tuple instanceof BinaryTupleContainer : "Tuple must be a BinaryTupleContainer: " + tuple.getClass(); BinaryTupleReader binaryTuple = ((BinaryTupleContainer) tuple).binaryTuple(); - assert binaryTuple != null : "Binary tuple must not be null: " + tuple.getClass(); int elementCount = part == TuplePart.KEY ? schema.keyColumns().length() : schema.length(); - assert elementCount == binaryTuple.elementCount() : - "Tuple element count mismatch: " + elementCount + " != " + binaryTuple.elementCount(); - packer.packBinaryTuple(binaryTuple); + if (binaryTuple != null) { + assert elementCount == binaryTuple.elementCount() : + "Tuple element count mismatch: " + elementCount + " != " + binaryTuple.elementCount() + " (" + tuple.getClass() + ")"; + + packer.packBinaryTuple(binaryTuple); + } else { + // Underlying binary tuple is not available or can't be used as is, pack columns one by one. + var builder = new BinaryTupleBuilder(elementCount); + + for (var i = 0; i < elementCount; i++) { + var col = schema.column(i); + Object v = tuple.valueOrDefault(col.name(), NO_VALUE); + + ClientBinaryTupleUtils.appendValue(builder, getColumnType(col.type().spec()), col.name(), getDecimalScale(col.type()), v); + } + + packer.packBinaryTuple(builder); + } } /** diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientSchemaVersionMismatchException.java b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientSchemaVersionMismatchException.java new file mode 100644 index 0000000000..ff9bb0ec7f --- /dev/null +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientSchemaVersionMismatchException.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + +import java.util.UUID; +import org.apache.ignite.lang.IgniteInternalException; +import org.jetbrains.annotations.Nullable; + +/** + * Indicates incompatible schema version. + */ +public class ClientSchemaVersionMismatchException extends IgniteInternalException { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** Expected schema version. */ + private final int expectedVersion; + + /** + * Constructor. + * + * @param traceId Trace ID. + * @param code Error code. + * @param message String message. + * @param expectedVersion Expected schema version. + * @param cause Cause. + */ + ClientSchemaVersionMismatchException(UUID traceId, int code, @Nullable String message, int expectedVersion, @Nullable Throwable cause) { + super(traceId, code, message, cause); + + this.expectedVersion = expectedVersion; + } + + /** + * Gets expected schema version. + * + * @return Expected schema version. + */ + public int expectedVersion() { + return expectedVersion; + } +} diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java index 599e888332..61f72bbebb 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java @@ -51,12 +51,14 @@ import org.apache.ignite.internal.client.proto.ClientMessageCommon; import org.apache.ignite.internal.client.proto.ClientMessagePacker; import org.apache.ignite.internal.client.proto.ClientMessageUnpacker; import org.apache.ignite.internal.client.proto.ClientOp; +import org.apache.ignite.internal.client.proto.ErrorExtensions; import org.apache.ignite.internal.client.proto.HandshakeExtension; import org.apache.ignite.internal.client.proto.ProtocolVersion; import org.apache.ignite.internal.client.proto.ResponseFlags; import org.apache.ignite.internal.client.proto.ServerMessageType; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.tostring.S; +import org.apache.ignite.lang.ErrorGroups.Table; import org.apache.ignite.lang.IgniteCheckedException; import org.apache.ignite.lang.IgniteException; import org.apache.ignite.lang.IgniteExceptionUtils; @@ -427,13 +429,34 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon private static <T extends Throwable> T readError(ClientMessageUnpacker unpacker) { var traceId = unpacker.unpackUuid(); var code = unpacker.unpackInt(); + var errClassName = unpacker.unpackString(); var errMsg = unpacker.tryUnpackNil() ? null : unpacker.unpackString(); IgniteException causeWithStackTrace = unpacker.tryUnpackNil() ? null : new IgniteException(traceId, code, unpacker.unpackString()); - // TODO IGNITE-19837 Retry outdated schema error - unpacker.skipValues(1); // Error extensions. + if (code == Table.SCHEMA_VERSION_MISMATCH_ERR) { + int extSize = unpacker.tryUnpackNil() ? 0 : unpacker.unpackMapHeader(); + int expectedSchemaVersion = -1; + + for (int i = 0; i < extSize; i++) { + String key = unpacker.unpackString(); + + if (key.equals(ErrorExtensions.EXPECTED_SCHEMA_VERSION)) { + expectedSchemaVersion = unpacker.unpackInt(); + } else { + // Unknown extension - ignore. + unpacker.skipValues(1); + } + } + + if (expectedSchemaVersion == -1) { + return (T) new IgniteException( + traceId, PROTOCOL_ERR, "Expected schema version is not specified in error extension map.", causeWithStackTrace); + } + + return (T) new ClientSchemaVersionMismatchException(traceId, code, errMsg, expectedSchemaVersion, causeWithStackTrace); + } try { // TODO https://issues.apache.org/jira/browse/IGNITE-19539 diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java index e71e6cf786..b31328a824 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java @@ -30,6 +30,7 @@ import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Function; import org.apache.ignite.client.RetryPolicy; +import org.apache.ignite.internal.client.ClientSchemaVersionMismatchException; import org.apache.ignite.internal.client.ClientUtils; import org.apache.ignite.internal.client.PayloadOutputChannel; import org.apache.ignite.internal.client.ReliableChannel; @@ -79,8 +80,8 @@ public class ClientTable implements Table { /** * Constructor. * - * @param ch Channel. - * @param id Table id. + * @param ch Channel. + * @param id Table id. * @param name Table name. */ public ClientTable(ReliableChannel ch, int id, String name) { @@ -259,7 +260,7 @@ public class ClientTable implements Table { /** * Writes transaction, if present. * - * @param tx Transaction. + * @param tx Transaction. * @param out Packer. */ public static void writeTx(@Nullable Transaction tx, PayloadOutputChannel out) { @@ -284,12 +285,25 @@ public class ClientTable implements Table { @Nullable T defaultValue, @Nullable PartitionAwarenessProvider provider ) { - CompletableFuture<ClientSchema> schemaFut = getLatestSchema(); + return doSchemaOutInOpAsync(opCode, writer, reader, defaultValue, provider, null); + } + + private <T> CompletableFuture<T> doSchemaOutInOpAsync( + int opCode, + BiConsumer<ClientSchema, PayloadOutputChannel> writer, + BiFunction<ClientSchema, ClientMessageUnpacker, T> reader, + @Nullable T defaultValue, + @Nullable PartitionAwarenessProvider provider, + @Nullable Integer schemaVersionOverride + ) { + CompletableFuture<T> fut = new CompletableFuture<>(); + + CompletableFuture<ClientSchema> schemaFut = getSchema(schemaVersionOverride == null ? latestSchemaVer : schemaVersionOverride); CompletableFuture<List<String>> partitionsFut = provider == null || !provider.isPartitionAwarenessEnabled() ? CompletableFuture.completedFuture(null) : getPartitionAssignment(); - return CompletableFuture.allOf(schemaFut, partitionsFut) + CompletableFuture.allOf(schemaFut, partitionsFut) .thenCompose(v -> { ClientSchema schema = schemaFut.getNow(null); String preferredNodeName = getPreferredNodeName(provider, partitionsFut.getNow(null), schema); @@ -300,7 +314,30 @@ public class ClientTable implements Table { preferredNodeName, null); }) - .thenCompose(t -> loadSchemaAndReadData(t, reader)); + .thenCompose(t -> loadSchemaAndReadData(t, reader)) + .whenComplete((res, err) -> { + if (err != null) { + if (err.getCause() instanceof ClientSchemaVersionMismatchException) { + // Retry with specific schema version. + int expectedVersion = ((ClientSchemaVersionMismatchException) err.getCause()).expectedVersion(); + + doSchemaOutInOpAsync(opCode, writer, reader, defaultValue, provider, expectedVersion) + .whenComplete((res0, err0) -> { + if (err0 != null) { + fut.completeExceptionally(err0); + } else { + fut.complete(res0); + } + }); + } else { + fut.completeExceptionally(err); + } + } else { + fut.complete(res); + } + }); + + return fut; } /** @@ -318,7 +355,7 @@ public class ClientTable implements Table { BiConsumer<ClientSchema, PayloadOutputChannel> writer, Function<ClientMessageUnpacker, T> reader, @Nullable PartitionAwarenessProvider provider) { - return doSchemaOutOpAsync(opCode, writer, reader, provider, null); + return doSchemaOutOpAsync(opCode, writer, reader, provider, null, null); } /** @@ -331,19 +368,42 @@ public class ClientTable implements Table { * @param <T> Result type. * @return Future representing pending completion of the operation. */ - public <T> CompletableFuture<T> doSchemaOutOpAsync( + <T> CompletableFuture<T> doSchemaOutOpAsync( int opCode, BiConsumer<ClientSchema, PayloadOutputChannel> writer, Function<ClientMessageUnpacker, T> reader, @Nullable PartitionAwarenessProvider provider, @Nullable RetryPolicy retryPolicyOverride) { + return doSchemaOutOpAsync(opCode, writer, reader, provider, retryPolicyOverride, null); + } + + /** + * Performs a schema-based operation. + * + * @param opCode Op code. + * @param writer Writer. + * @param reader Reader. + * @param provider Partition awareness provider. + * @param retryPolicyOverride Retry policy override. + * @param schemaVersionOverride Schema version override. + * @param <T> Result type. + * @return Future representing pending completion of the operation. + */ + private <T> CompletableFuture<T> doSchemaOutOpAsync( + int opCode, + BiConsumer<ClientSchema, PayloadOutputChannel> writer, + Function<ClientMessageUnpacker, T> reader, + @Nullable PartitionAwarenessProvider provider, + @Nullable RetryPolicy retryPolicyOverride, + @Nullable Integer schemaVersionOverride) { + CompletableFuture<T> fut = new CompletableFuture<>(); - CompletableFuture<ClientSchema> schemaFut = getLatestSchema(); + CompletableFuture<ClientSchema> schemaFut = getSchema(schemaVersionOverride == null ? latestSchemaVer : schemaVersionOverride); CompletableFuture<List<String>> partitionsFut = provider == null || !provider.isPartitionAwarenessEnabled() ? CompletableFuture.completedFuture(null) : getPartitionAssignment(); - return CompletableFuture.allOf(schemaFut, partitionsFut) + CompletableFuture.allOf(schemaFut, partitionsFut) .thenCompose(v -> { ClientSchema schema = schemaFut.getNow(null); String preferredNodeName = getPreferredNodeName(provider, partitionsFut.getNow(null), schema); @@ -357,7 +417,30 @@ public class ClientTable implements Table { }, preferredNodeName, retryPolicyOverride); + }) + .whenComplete((res, err) -> { + if (err != null) { + if (err.getCause() instanceof ClientSchemaVersionMismatchException) { + // Retry with specific schema version. + int expectedVersion = ((ClientSchemaVersionMismatchException) err.getCause()).expectedVersion(); + + doSchemaOutOpAsync(opCode, writer, reader, provider, retryPolicyOverride, expectedVersion) + .whenComplete((res0, err0) -> { + if (err0 != null) { + fut.completeExceptionally(err0); + } else { + fut.complete(res0); + } + }); + } else { + fut.completeExceptionally(err); + } + } else { + fut.complete(res); + } }); + + return fut; } private <T> @Nullable Object readSchemaAndReadData( diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTupleSerializer.java b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTupleSerializer.java index 23341d96f6..f8199d31b3 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTupleSerializer.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTupleSerializer.java @@ -19,14 +19,7 @@ package org.apache.ignite.internal.client.table; import static org.apache.ignite.internal.client.proto.ClientMessageCommon.NO_VALUE; import static org.apache.ignite.internal.client.table.ClientTable.writeTx; -import static org.apache.ignite.lang.ErrorGroups.Client.PROTOCOL_ERR; - -import java.math.BigDecimal; -import java.math.BigInteger; -import java.time.Instant; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.LocalTime; + import java.util.ArrayList; import java.util.BitSet; import java.util.Collection; @@ -34,16 +27,15 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.UUID; import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder; import org.apache.ignite.internal.binarytuple.BinaryTupleReader; import org.apache.ignite.internal.client.PayloadOutputChannel; +import org.apache.ignite.internal.client.proto.ClientBinaryTupleUtils; import org.apache.ignite.internal.client.proto.ClientMessageUnpacker; import org.apache.ignite.internal.client.proto.TuplePart; import org.apache.ignite.internal.client.tx.ClientTransaction; import org.apache.ignite.internal.util.HashCalculator; import org.apache.ignite.lang.IgniteBiTuple; -import org.apache.ignite.lang.IgniteException; import org.apache.ignite.table.Tuple; import org.apache.ignite.table.mapper.Mapper; import org.apache.ignite.tx.Transaction; @@ -316,93 +308,13 @@ public class ClientTupleSerializer { } private static void appendValue(BinaryTupleBuilder builder, BitSet noValueSet, ClientColumn col, Object v) { - if (v == null) { - builder.appendNull(); - return; - } - if (v == NO_VALUE) { noValueSet.set(col.schemaIndex()); builder.appendNull(); return; } - try { - switch (col.type()) { - case BOOLEAN: - builder.appendBoolean((boolean) v); - return; - - case INT8: - builder.appendByte((byte) v); - return; - - case INT16: - builder.appendShort((short) v); - return; - - case INT32: - builder.appendInt((int) v); - return; - - case INT64: - builder.appendLong((long) v); - return; - - case FLOAT: - builder.appendFloat((float) v); - return; - - case DOUBLE: - builder.appendDouble((double) v); - return; - - case DECIMAL: - builder.appendDecimalNotNull((BigDecimal) v, col.scale()); - return; - - case UUID: - builder.appendUuidNotNull((UUID) v); - return; - - case STRING: - builder.appendStringNotNull((String) v); - return; - - case BYTE_ARRAY: - builder.appendBytesNotNull((byte[]) v); - return; - - case BITMASK: - builder.appendBitmaskNotNull((BitSet) v); - return; - - case DATE: - builder.appendDateNotNull((LocalDate) v); - return; - - case TIME: - builder.appendTimeNotNull((LocalTime) v); - return; - - case DATETIME: - builder.appendDateTimeNotNull((LocalDateTime) v); - return; - - case TIMESTAMP: - builder.appendTimestampNotNull((Instant) v); - return; - - case NUMBER: - builder.appendNumberNotNull((BigInteger) v); - return; - - default: - throw new IllegalArgumentException("Unsupported type: " + col.type()); - } - } catch (ClassCastException e) { - throw new IgniteException(PROTOCOL_ERR, "Incorrect value type for column '" + col.name() + "': " + e.getMessage(), e); - } + ClientBinaryTupleUtils.appendValue(builder, col.type(), col.name(), col.scale(), v); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java index 3c6f88f80e..c722f505d7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java @@ -449,7 +449,7 @@ public final class ExceptionUtils { * @param t Throwable to extract a trace identifier. * @return Returns trace identifier. */ - public static UUID getOrCreateTraceId(Throwable t) { + public static UUID getOrCreateTraceId(@Nullable Throwable t) { Throwable e = t; // This collection is used to avoid infinite loops in case of cyclic dependencies. diff --git a/modules/core/src/main/java/org/apache/ignite/lang/IgniteInternalException.java b/modules/core/src/main/java/org/apache/ignite/lang/IgniteInternalException.java index 974b3ff2a1..73f5b04b6d 100644 --- a/modules/core/src/main/java/org/apache/ignite/lang/IgniteInternalException.java +++ b/modules/core/src/main/java/org/apache/ignite/lang/IgniteInternalException.java @@ -142,7 +142,7 @@ public class IgniteInternalException extends RuntimeException implements Traceab * @param message Detail message. * @param cause Optional nested exception (can be {@code null}). */ - public IgniteInternalException(UUID traceId, int code, String message, @Nullable Throwable cause) { + public IgniteInternalException(UUID traceId, int code, @Nullable String message, @Nullable Throwable cause) { super(message, cause); this.traceId = traceId; diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSchemaSynchronizationTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSchemaSynchronizationTest.java index 1be0a64acd..e50b50e944 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSchemaSynchronizationTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSchemaSynchronizationTest.java @@ -17,12 +17,10 @@ package org.apache.ignite.internal.runner.app.client; -import static org.hamcrest.CoreMatchers.containsString; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; import org.apache.ignite.client.IgniteClient; -import org.apache.ignite.lang.IgniteException; import org.apache.ignite.sql.Session; import org.apache.ignite.table.RecordView; import org.apache.ignite.table.Tuple; @@ -31,15 +29,15 @@ import org.junit.jupiter.api.Test; /** * Tests for client schema synchronization. */ +@SuppressWarnings("resource") public class ItThinClientSchemaSynchronizationTest extends ItAbstractThinClientTest { - @SuppressWarnings("resource") @Test - void testOutdatedSchemaFromClientThrowsExceptionOnServer() throws InterruptedException { + void testClientUsesLatestSchemaOnWrite() throws InterruptedException { IgniteClient client = client(); Session ses = client.sql().createSession(); // Create table, insert data. - String tableName = "testOutdatedSchemaFromClientThrowsExceptionOnServer"; + String tableName = "testClientUsesLatestSchemaOnWrite"; ses.execute(null, "CREATE TABLE " + tableName + "(ID INT NOT NULL PRIMARY KEY)"); waitForTableOnAllNodes(tableName); @@ -48,11 +46,55 @@ public class ItThinClientSchemaSynchronizationTest extends ItAbstractThinClientT Tuple rec = Tuple.create().set("ID", 1); recordView.insert(null, rec); - // Modify table, get data - client will use old schema. - ses.execute(null, "ALTER TABLE testOutdatedSchemaFromClientThrowsExceptionOnServer ADD COLUMN NAME VARCHAR"); + // Modify table, insert data - client will use old schema, receive error, retry with new schema. + // The process is transparent for the user: updated schema is in effect immediately. + ses.execute(null, "ALTER TABLE " + tableName + " ADD COLUMN NAME VARCHAR NOT NULL"); - // TODO IGNITE-19837 Retry outdated schema error - IgniteException ex = assertThrows(IgniteException.class, () -> recordView.insert(null, rec)); - assertThat(ex.getMessage(), containsString("Schema version mismatch [expectedVer=2, actualVer=1]")); + Tuple rec2 = Tuple.create().set("ID", 1).set("NAME", "name"); + recordView.upsert(null, rec2); + + assertEquals("name", recordView.get(null, rec).stringValue(1)); + } + + @Test + void testClientUsesLatestSchemaOnRead() throws InterruptedException { + IgniteClient client = client(); + Session ses = client.sql().createSession(); + + // Create table, insert data. + String tableName = "testClientUsesLatestSchemaOnRead"; + ses.execute(null, "CREATE TABLE " + tableName + "(ID INT NOT NULL PRIMARY KEY)"); + + waitForTableOnAllNodes(tableName); + RecordView<Tuple> recordView = client.tables().table(tableName).recordView(); + + Tuple rec = Tuple.create().set("ID", 1); + recordView.insert(null, rec); + + // Modify table, insert data - client will use old schema, receive error, retry with new schema. + // The process is transparent for the user: updated schema is in effect immediately. + ses.execute(null, "ALTER TABLE " + tableName + " ADD COLUMN NAME VARCHAR DEFAULT 'def_name'"); + assertEquals("def_name", recordView.get(null, rec).stringValue(1)); + } + + @Test + void testClientUsesLatestSchemaOnReadWithNotNullColumn() throws InterruptedException { + IgniteClient client = client(); + Session ses = client.sql().createSession(); + + // Create table, insert data. + String tableName = "testClientUsesLatestSchemaOnReadWithNotNullColumn"; + ses.execute(null, "CREATE TABLE " + tableName + "(ID INT NOT NULL PRIMARY KEY)"); + + waitForTableOnAllNodes(tableName); + RecordView<Tuple> recordView = client.tables().table(tableName).recordView(); + + Tuple rec = Tuple.create().set("ID", 1); + recordView.insert(null, rec); + + // Modify table and get old row. + // It still has null value in the old column, even though it is not allowed by the new schema. + ses.execute(null, "ALTER TABLE " + tableName + " ADD COLUMN NAME VARCHAR NOT NULL"); + assertNull(recordView.get(null, rec).stringValue(1)); } } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java index b4e67b24ea..dda034d88c 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java @@ -32,6 +32,7 @@ import java.util.concurrent.SubmissionPublisher; import java.util.concurrent.TimeUnit; import org.apache.ignite.Ignite; import org.apache.ignite.internal.sql.engine.ClusterPerClassIntegrationTest; +import org.apache.ignite.sql.Session; import org.apache.ignite.table.DataStreamerOptions; import org.apache.ignite.table.KeyValueView; import org.apache.ignite.table.RecordView; @@ -144,16 +145,7 @@ public abstract class ItAbstractDataStreamerTest extends ClusterPerClassIntegrat view.streamData(publisher, options); publisher.submit(tuple(1, "foo")); - assertTrue(waitForCondition(() -> { - @SuppressWarnings("resource") - var tx = ignite().transactions().begin(new TransactionOptions().readOnly(true)); - - try { - return view.get(tx, tupleKey(1)) != null; - } finally { - tx.rollback(); - } - }, 50, 5000)); + waitForKey(view, tupleKey(1)); } } @@ -213,6 +205,46 @@ public abstract class ItAbstractDataStreamerTest extends ClusterPerClassIntegrat assertNull(view.get(null, tupleKey(10_000))); } + @SuppressWarnings("resource") + @Test + public void testSchemaUpdateWhileStreaming() throws InterruptedException { + Session ses = ignite().sql().createSession(); + + String tableName = "testSchemaUpdateWhileStreaming"; + ses.execute(null, "CREATE TABLE " + tableName + "(ID INT NOT NULL PRIMARY KEY)"); + RecordView<Tuple> view = ignite().tables().table(tableName).recordView(); + + CompletableFuture<Void> streamerFut; + + try (var publisher = new SubmissionPublisher<Tuple>()) { + var options = DataStreamerOptions.builder().batchSize(1).build(); + streamerFut = view.streamData(publisher, options); + + publisher.submit(tupleKey(1)); + waitForKey(view, tupleKey(1)); + + ses.execute(null, "ALTER TABLE " + tableName + " ADD COLUMN NAME VARCHAR NOT NULL"); + publisher.submit(tuple(2, "bar")); + } + + streamerFut.orTimeout(1, TimeUnit.SECONDS).join(); + + assertEquals("bar", view.get(null, tupleKey(2)).stringValue("name")); + } + + private void waitForKey(RecordView<Tuple> view, Tuple key) throws InterruptedException { + assertTrue(waitForCondition(() -> { + @SuppressWarnings("resource") + var tx = ignite().transactions().begin(new TransactionOptions().readOnly(true)); + + try { + return view.get(tx, key) != null; + } finally { + tx.rollback(); + } + }, 50, 5000)); + } + private Table defaultTable() { //noinspection resource return ignite().tables().table(TABLE_NAME); diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java index dff0f63f1a..51924510a0 100644 --- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java +++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java @@ -26,6 +26,7 @@ import java.time.LocalTime; import java.util.BitSet; import java.util.UUID; import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.schema.BinaryTuple; import org.apache.ignite.internal.schema.BinaryTupleSchema; import org.apache.ignite.internal.schema.Column; import org.apache.ignite.internal.schema.InvalidTypeException; @@ -418,4 +419,11 @@ class UpgradingRowAdapter extends Row { return mappedId < 0 ? (Instant) column.defaultValue() : super.timestampValue(mappedId); } + + /** {@inheritDoc} */ + @Override + public BinaryTuple binaryTuple() { + // Underlying binary tuple can not be used directly. + return null; + } }