This is an automated email from the ASF dual-hosted git repository. ppa pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new a1fe0da528 IGNITE-20181: KV/Binary view public API should only throw public exceptions (#2523) a1fe0da528 is described below commit a1fe0da528ab930ad806056258a3e2194395998b Author: Max Zhuravkov <shh...@gmail.com> AuthorDate: Fri Sep 8 13:30:38 2023 +0300 IGNITE-20181: KV/Binary view public API should only throw public exceptions (#2523) --- .../apache/ignite/lang/MarshallerException.java | 18 +++++- .../ignite/lang/UnexpectedNullValueException.java | 3 +- .../ignite/internal/IgniteExceptionArchTest.java | 2 - .../Table/SchemaValidationTest.cs | 16 +++--- .../ignite/internal/table/AbstractTableView.java | 34 ++++------- .../internal/table/KeyValueBinaryViewImpl.java | 46 ++++++++------- ...aluePojoStreamerPartitionAwarenessProvider.java | 2 +- .../ignite/internal/table/KeyValueViewImpl.java | 67 ++++++++++++---------- .../internal/table/RecordBinaryViewImpl.java | 49 +++++++++------- .../ignite/internal/table/RecordViewImpl.java | 59 ++++++++++--------- .../internal/table/TableViewRowConverter.java | 66 +++++++++++++++++++++ 11 files changed, 226 insertions(+), 136 deletions(-) diff --git a/modules/api/src/main/java/org/apache/ignite/lang/MarshallerException.java b/modules/api/src/main/java/org/apache/ignite/lang/MarshallerException.java index 5a10d26718..98f1d7d41b 100644 --- a/modules/api/src/main/java/org/apache/ignite/lang/MarshallerException.java +++ b/modules/api/src/main/java/org/apache/ignite/lang/MarshallerException.java @@ -17,6 +17,10 @@ package org.apache.ignite.lang; +import java.util.UUID; +import org.apache.ignite.lang.ErrorGroups.Common; +import org.jetbrains.annotations.Nullable; + /** * This exception is caused by a failure to marshall or unmarshall a value. * The failure can be due to a value not matching the a schema or to another reason. @@ -28,6 +32,18 @@ public class MarshallerException extends IgniteException { * @param cause Non-null throwable cause. */ public MarshallerException(Throwable cause) { - super(cause); + super(Common.INTERNAL_ERR, cause); + } + + /** + * Creates an exception with the given trace ID, error code, detailed message, and cause. + * + * @param traceId Unique identifier of the exception. + * @param code Full error code. + * @param message Detailed message. + * @param cause Optional nested exception (can be {@code null}). + */ + public MarshallerException(UUID traceId, int code, String message, @Nullable Throwable cause) { + super(traceId, code, message, cause); } } diff --git a/modules/api/src/main/java/org/apache/ignite/lang/UnexpectedNullValueException.java b/modules/api/src/main/java/org/apache/ignite/lang/UnexpectedNullValueException.java index 4a5acdd428..4caf9697b7 100644 --- a/modules/api/src/main/java/org/apache/ignite/lang/UnexpectedNullValueException.java +++ b/modules/api/src/main/java/org/apache/ignite/lang/UnexpectedNullValueException.java @@ -18,6 +18,7 @@ package org.apache.ignite.lang; import java.util.UUID; +import org.apache.ignite.lang.ErrorGroups.Common; /** * This exception is thrown instead of returning a null value from a method that does not respect {@code null}-value to avoid ambiguity @@ -30,7 +31,7 @@ public class UnexpectedNullValueException extends IgniteException { * @param msg Message. */ public UnexpectedNullValueException(String msg) { - super("Got unexpected null value: " + msg); + super(Common.INTERNAL_ERR, msg); } /** diff --git a/modules/arch-test/src/test/java/org/apache/ignite/internal/IgniteExceptionArchTest.java b/modules/arch-test/src/test/java/org/apache/ignite/internal/IgniteExceptionArchTest.java index f70ee7cd5a..cb94459420 100644 --- a/modules/arch-test/src/test/java/org/apache/ignite/internal/IgniteExceptionArchTest.java +++ b/modules/arch-test/src/test/java/org/apache/ignite/internal/IgniteExceptionArchTest.java @@ -39,7 +39,6 @@ import org.apache.ignite.client.IgniteClientFeatureNotSupportedByServerException import org.apache.ignite.lang.IgniteCheckedException; import org.apache.ignite.lang.IgniteException; import org.apache.ignite.lang.LocationProvider.RootLocationProvider; -import org.apache.ignite.lang.MarshallerException; import org.apache.ignite.network.UnresolvableConsistentIdException; import org.apache.ignite.security.AuthenticationException; import org.apache.ignite.security.UnknownAuthenticationTypeException; @@ -86,7 +85,6 @@ public class IgniteExceptionArchTest { exclusions.add(IgniteClientAuthenticationException.class.getCanonicalName()); exclusions.add(IgniteClientConnectionException.class.getCanonicalName()); exclusions.add(IgniteClientFeatureNotSupportedByServerException.class.getCanonicalName()); - exclusions.add(MarshallerException.class.getCanonicalName()); exclusions.add(UnresolvableConsistentIdException.class.getCanonicalName()); exclusions.add(AuthenticationException.class.getCanonicalName()); exclusions.add(UnknownAuthenticationTypeException.class.getCanonicalName()); diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaValidationTest.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaValidationTest.cs index 98f2d3e3ee..a512f7264c 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaValidationTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaValidationTest.cs @@ -106,7 +106,7 @@ public class SchemaValidationTest : IgniteTestsBase [ValCol] = "v" }; - var ex = Assert.ThrowsAsync<IgniteException>(async () => await TupleView.UpsertAsync(null, igniteTuple)); + var ex = Assert.ThrowsAsync<MarshallerException>(async () => await TupleView.UpsertAsync(null, igniteTuple)); Assert.AreEqual("Missed key column: KEY", ex!.Message); } @@ -118,7 +118,7 @@ public class SchemaValidationTest : IgniteTestsBase [KeyCol] = 1L }; - var ex = Assert.ThrowsAsync<IgniteException>(async () => await TableRequiredVal.RecordBinaryView.UpsertAsync(null, igniteTuple)); + var ex = Assert.ThrowsAsync<MarshallerException>(async () => await TableRequiredVal.RecordBinaryView.UpsertAsync(null, igniteTuple)); StringAssert.StartsWith("Failed to set column (null was passed, but column is not null", ex!.Message); StringAssert.Contains("name=VAL", ex.Message); } @@ -133,7 +133,7 @@ public class SchemaValidationTest : IgniteTestsBase [ValCol] = "v" }; - var ex = Assert.ThrowsAsync<IgniteException>(async () => await Table.KeyValueBinaryView.PutAsync(null, keyTuple, valTuple)); + var ex = Assert.ThrowsAsync<MarshallerException>(async () => await Table.KeyValueBinaryView.PutAsync(null, keyTuple, valTuple)); Assert.AreEqual("Missed key column: KEY", ex!.Message); } @@ -147,7 +147,7 @@ public class SchemaValidationTest : IgniteTestsBase var valTuple = new IgniteTuple(); - var ex = Assert.ThrowsAsync<IgniteException>( + var ex = Assert.ThrowsAsync<MarshallerException>( async () => await TableRequiredVal.KeyValueBinaryView.PutAsync(null, keyTuple, valTuple)); StringAssert.StartsWith("Failed to set column (null was passed, but column is not null", ex!.Message); StringAssert.Contains("name=VAL", ex.Message); @@ -248,7 +248,7 @@ public class SchemaValidationTest : IgniteTestsBase [Test] public void TestMissingKeyPocoFields() { - var ex = Assert.ThrowsAsync<IgniteException>(async () => await Table.GetRecordView<ValPoco>().UpsertAsync(null, new ValPoco())); + var ex = Assert.ThrowsAsync<MarshallerException>(async () => await Table.GetRecordView<ValPoco>().UpsertAsync(null, new ValPoco())); Assert.AreEqual("Missed key column: KEY", ex!.Message); } @@ -256,7 +256,7 @@ public class SchemaValidationTest : IgniteTestsBase [Test] public void TestMissingValPocoFields() { - var ex = Assert.ThrowsAsync<IgniteException>( + var ex = Assert.ThrowsAsync<MarshallerException>( async () => await TableRequiredVal.GetRecordView<KeyPoco>().UpsertAsync(null, new KeyPoco())); StringAssert.StartsWith("Failed to set column (null was passed, but column is not null", ex!.Message); @@ -266,7 +266,7 @@ public class SchemaValidationTest : IgniteTestsBase [Test] public void TestKvMissingKeyPocoFields() { - var ex = Assert.ThrowsAsync<IgniteException>( + var ex = Assert.ThrowsAsync<MarshallerException>( async () => await Table.GetKeyValueView<ValPoco, string>().PutAsync(null, new ValPoco(), "x")); Assert.AreEqual("Missed key column: KEY", ex!.Message); @@ -275,7 +275,7 @@ public class SchemaValidationTest : IgniteTestsBase [Test] public void TestKvMissingValPocoFields() { - var ex = Assert.ThrowsAsync<IgniteException>( + var ex = Assert.ThrowsAsync<MarshallerException>( async () => await TableRequiredVal.GetKeyValueView<long, KeyPoco>().PutAsync(null, 1L, new KeyPoco())); StringAssert.StartsWith("Failed to set column (null was passed, but column is not null", ex!.Message); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java index 056ec46767..bfca56c948 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java @@ -17,11 +17,13 @@ package org.apache.ignite.internal.table; +import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow; + import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import org.apache.ignite.internal.schema.SchemaRegistry; -import org.apache.ignite.lang.IgniteException; -import org.apache.ignite.lang.IgniteInternalException; +import org.apache.ignite.internal.util.ExceptionUtils; +import org.apache.ignite.lang.IgniteExceptionMapperUtil; /** * Base class for Table views. @@ -30,8 +32,8 @@ abstract class AbstractTableView { /** Internal table. */ protected final InternalTable tbl; - /** Schema registry. */ - protected final SchemaRegistry schemaReg; + /** Table row view converter. */ + protected final TableViewRowConverter rowConverter; /** * Constructor. @@ -41,7 +43,7 @@ abstract class AbstractTableView { */ protected AbstractTableView(InternalTable tbl, SchemaRegistry schemaReg) { this.tbl = tbl; - this.schemaReg = schemaReg; + this.rowConverter = new TableViewRowConverter(schemaReg); } /** @@ -57,26 +59,10 @@ abstract class AbstractTableView { } catch (InterruptedException e) { Thread.currentThread().interrupt(); // Restore interrupt flag. - throw convertException(e); + throw sneakyThrow(IgniteExceptionMapperUtil.mapToPublicException(e)); } catch (ExecutionException e) { - throw convertException(e.getCause()); - } catch (IgniteInternalException e) { - throw convertException(e); - } - } - - /** - * Converts an internal exception to a public one. - * - * @param th Internal exception. - * @return Public exception. - */ - protected IgniteException convertException(Throwable th) { - if (th instanceof IgniteException) { - return (IgniteException) th; + Throwable cause = ExceptionUtils.unwrapCause(e); + throw sneakyThrow(cause); } - - //TODO: IGNITE-20181 KV/Binary view public API should only throw public exceptions for the end user. - return new IgniteException(th); } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java index 72c89b550c..85f7371093 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.table; +import static org.apache.ignite.lang.IgniteExceptionMapperUtil.convertToPublicFuture; + import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -36,6 +38,7 @@ import org.apache.ignite.internal.streamer.StreamerBatchSender; import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.lang.IgniteException; +import org.apache.ignite.lang.MarshallerException; import org.apache.ignite.lang.NullableValue; import org.apache.ignite.table.DataStreamerOptions; import org.apache.ignite.table.KeyValueView; @@ -78,7 +81,7 @@ public class KeyValueBinaryViewImpl extends AbstractTableView implements KeyValu Row keyRow = marshal(key, null); - return tbl.get(keyRow, (InternalTransaction) tx).thenApply(this::unmarshalValue); + return convertToPublicFuture(tbl.get(keyRow, (InternalTransaction) tx).thenApply(this::unmarshalValue)); } /** @@ -112,7 +115,8 @@ public class KeyValueBinaryViewImpl extends AbstractTableView implements KeyValu public CompletableFuture<Tuple> getOrDefaultAsync(@Nullable Transaction tx, Tuple key, Tuple defaultValue) { BinaryRowEx keyRow = marshal(Objects.requireNonNull(key), null); - return tbl.get(keyRow, (InternalTransaction) tx).thenApply(r -> IgniteUtils.nonNullOrElse(unmarshalValue(r), defaultValue)); + return convertToPublicFuture(tbl.get(keyRow, (InternalTransaction) tx) + .thenApply(r -> IgniteUtils.nonNullOrElse(unmarshalValue(r), defaultValue))); } /** {@inheritDoc} */ @@ -126,7 +130,7 @@ public class KeyValueBinaryViewImpl extends AbstractTableView implements KeyValu public CompletableFuture<Map<Tuple, Tuple>> getAllAsync(@Nullable Transaction tx, Collection<Tuple> keys) { List<BinaryRowEx> keyRows = marshalKeys(Objects.requireNonNull(keys)); - return tbl.getAll(keyRows, (InternalTransaction) tx).thenApply(this::unmarshalValue); + return convertToPublicFuture(tbl.getAll(keyRows, (InternalTransaction) tx).thenApply(this::unmarshalValue)); } /** {@inheritDoc} */ @@ -155,7 +159,7 @@ public class KeyValueBinaryViewImpl extends AbstractTableView implements KeyValu Row row = marshal(key, val); - return tbl.upsert(row, (InternalTransaction) tx); + return convertToPublicFuture(tbl.upsert(row, (InternalTransaction) tx)); } /** {@inheritDoc} */ @@ -169,7 +173,7 @@ public class KeyValueBinaryViewImpl extends AbstractTableView implements KeyValu public CompletableFuture<Void> putAllAsync(@Nullable Transaction tx, Map<Tuple, Tuple> pairs) { Objects.requireNonNull(pairs); - return tbl.upsertAll(marshalPairs(pairs.entrySet()), (InternalTransaction) tx); + return convertToPublicFuture(tbl.upsertAll(marshalPairs(pairs.entrySet()), (InternalTransaction) tx)); } /** {@inheritDoc} */ @@ -186,7 +190,7 @@ public class KeyValueBinaryViewImpl extends AbstractTableView implements KeyValu Row row = marshal(key, val); - return tbl.getAndUpsert(row, (InternalTransaction) tx).thenApply(this::unmarshalValue); + return convertToPublicFuture(tbl.getAndUpsert(row, (InternalTransaction) tx).thenApply(this::unmarshalValue)); } /** @@ -224,7 +228,7 @@ public class KeyValueBinaryViewImpl extends AbstractTableView implements KeyValu Row row = marshal(key, val); - return tbl.insert(row, (InternalTransaction) tx); + return convertToPublicFuture(tbl.insert(row, (InternalTransaction) tx)); } /** {@inheritDoc} */ @@ -246,7 +250,7 @@ public class KeyValueBinaryViewImpl extends AbstractTableView implements KeyValu Row row = marshal(key, null); - return tbl.delete(row, (InternalTransaction) tx); + return convertToPublicFuture(tbl.delete(row, (InternalTransaction) tx)); } /** {@inheritDoc} */ @@ -257,7 +261,7 @@ public class KeyValueBinaryViewImpl extends AbstractTableView implements KeyValu Row row = marshal(key, val); - return tbl.deleteExact(row, (InternalTransaction) tx); + return convertToPublicFuture(tbl.deleteExact(row, (InternalTransaction) tx)); } /** {@inheritDoc} */ @@ -271,8 +275,8 @@ public class KeyValueBinaryViewImpl extends AbstractTableView implements KeyValu public CompletableFuture<Collection<Tuple>> removeAllAsync(@Nullable Transaction tx, Collection<Tuple> keys) { List<BinaryRowEx> keyRows = marshalKeys(Objects.requireNonNull(keys)); - return tbl.deleteAll(keyRows, (InternalTransaction) tx) - .thenApply(this::unmarshalKeys); + return convertToPublicFuture(tbl.deleteAll(keyRows, (InternalTransaction) tx) + .thenApply(this::unmarshalKeys)); } /** {@inheritDoc} */ @@ -288,7 +292,7 @@ public class KeyValueBinaryViewImpl extends AbstractTableView implements KeyValu public CompletableFuture<Tuple> getAndRemoveAsync(@Nullable Transaction tx, Tuple key) { Objects.requireNonNull(key); - return tbl.getAndDelete(marshal(key, null), (InternalTransaction) tx).thenApply(this::unmarshalValue); + return convertToPublicFuture(tbl.getAndDelete(marshal(key, null), (InternalTransaction) tx).thenApply(this::unmarshalValue)); } /** @@ -331,7 +335,7 @@ public class KeyValueBinaryViewImpl extends AbstractTableView implements KeyValu Row row = marshal(key, val); - return tbl.replace(row, (InternalTransaction) tx); + return convertToPublicFuture(tbl.replace(row, (InternalTransaction) tx)); } /** {@inheritDoc} */ @@ -349,7 +353,7 @@ public class KeyValueBinaryViewImpl extends AbstractTableView implements KeyValu Row oldRow = marshal(key, oldVal); Row newRow = marshal(key, newVal); - return tbl.replace(oldRow, newRow, (InternalTransaction) tx); + return convertToPublicFuture(tbl.replace(oldRow, newRow, (InternalTransaction) tx)); } /** {@inheritDoc} */ @@ -364,7 +368,7 @@ public class KeyValueBinaryViewImpl extends AbstractTableView implements KeyValu Objects.requireNonNull(key); Objects.requireNonNull(val); - return tbl.getAndReplace(marshal(key, val), (InternalTransaction) tx).thenApply(this::unmarshalValue); + return convertToPublicFuture(tbl.getAndReplace(marshal(key, val), (InternalTransaction) tx).thenApply(this::unmarshalValue)); } /** @@ -400,7 +404,7 @@ public class KeyValueBinaryViewImpl extends AbstractTableView implements KeyValu try { return marsh.marshal(key, val); } catch (TupleMarshallerException ex) { - throw convertException(ex); + throw new MarshallerException(ex); } } @@ -415,7 +419,7 @@ public class KeyValueBinaryViewImpl extends AbstractTableView implements KeyValu return null; } - return TableRow.valueTuple(schemaReg.resolve(row)); + return TableRow.valueTuple(rowConverter.resolveRow(row)); } /** @@ -427,7 +431,7 @@ public class KeyValueBinaryViewImpl extends AbstractTableView implements KeyValu private Map<Tuple, Tuple> unmarshalValue(Collection<BinaryRow> rows) { Map<Tuple, Tuple> pairs = IgniteUtils.newHashMap(rows.size()); - for (Row row : schemaReg.resolve(rows)) { + for (Row row : rowConverter.resolveRows(rows)) { if (row != null) { pairs.put(TableRow.keyTuple(row), TableRow.valueTuple(row)); } @@ -468,7 +472,7 @@ public class KeyValueBinaryViewImpl extends AbstractTableView implements KeyValu List<Tuple> tuples = new ArrayList<>(rows.size()); - for (Row row : schemaReg.resolveKeys(rows)) { + for (Row row : rowConverter.resolveKeys(rows)) { tuples.add(TableRow.keyTuple(row)); } @@ -480,9 +484,9 @@ public class KeyValueBinaryViewImpl extends AbstractTableView implements KeyValu public CompletableFuture<Void> streamData(Publisher<Entry<Tuple, Tuple>> publisher, @Nullable DataStreamerOptions options) { Objects.requireNonNull(publisher); - var partitioner = new KeyValueTupleStreamerPartitionAwarenessProvider(schemaReg, tbl.partitions()); + var partitioner = new KeyValueTupleStreamerPartitionAwarenessProvider(rowConverter.registry(), tbl.partitions()); StreamerBatchSender<Entry<Tuple, Tuple>, Integer> batchSender = - (partitionId, items) -> tbl.upsertAll(marshalPairs(items), partitionId); + (partitionId, items) -> convertToPublicFuture(this.tbl.upsertAll(marshalPairs(items), partitionId)); return DataStreamer.streamData(publisher, options, batchSender, partitioner); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValuePojoStreamerPartitionAwarenessProvider.java b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValuePojoStreamerPartitionAwarenessProvider.java index 6668cd7783..f34b2e49b9 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValuePojoStreamerPartitionAwarenessProvider.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValuePojoStreamerPartitionAwarenessProvider.java @@ -52,7 +52,7 @@ class KeyValuePojoStreamerPartitionAwarenessProvider<K, V> extends AbstractClien return hashCalc.hash(); } catch (MarshallerException e) { - throw new RuntimeException(e); + throw new org.apache.ignite.lang.MarshallerException(e); } } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java index 7c29c161dc..167d656370 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.table; +import static org.apache.ignite.lang.IgniteExceptionMapperUtil.convertToPublicFuture; + import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -87,7 +89,7 @@ public class KeyValueViewImpl<K, V> extends AbstractTableView implements KeyValu public CompletableFuture<V> getAsync(@Nullable Transaction tx, K key) { BinaryRowEx keyRow = marshal(Objects.requireNonNull(key)); - return tbl.get(keyRow, (InternalTransaction) tx).thenApply(this::unmarshallValue); + return convertToPublicFuture(tbl.get(keyRow, (InternalTransaction) tx).thenApply(this::unmarshallValue)); } /** {@inheritDoc} */ @@ -101,7 +103,8 @@ public class KeyValueViewImpl<K, V> extends AbstractTableView implements KeyValu public CompletableFuture<NullableValue<V>> getNullableAsync(@Nullable Transaction tx, K key) { BinaryRowEx keyRow = marshal(Objects.requireNonNull(key)); - return tbl.get(keyRow, (InternalTransaction) tx).thenApply(r -> r == null ? null : NullableValue.of(unmarshalNullableValue(r))); + return convertToPublicFuture(tbl.get(keyRow, (InternalTransaction) tx) + .thenApply(r -> r == null ? null : NullableValue.of(unmarshalNullableValue(r)))); } /** {@inheritDoc} */ @@ -115,7 +118,8 @@ public class KeyValueViewImpl<K, V> extends AbstractTableView implements KeyValu public CompletableFuture<V> getOrDefaultAsync(@Nullable Transaction tx, K key, V defaultValue) { BinaryRowEx keyRow = marshal(Objects.requireNonNull(key)); - return tbl.get(keyRow, (InternalTransaction) tx).thenApply(r -> IgniteUtils.nonNullOrElse(unmarshalNullableValue(r), defaultValue)); + return convertToPublicFuture(tbl.get(keyRow, (InternalTransaction) tx) + .thenApply(r -> IgniteUtils.nonNullOrElse(unmarshalNullableValue(r), defaultValue))); } /** {@inheritDoc} */ @@ -129,7 +133,7 @@ public class KeyValueViewImpl<K, V> extends AbstractTableView implements KeyValu public CompletableFuture<Map<K, V>> getAllAsync(@Nullable Transaction tx, Collection<K> keys) { Collection<BinaryRowEx> rows = marshal(Objects.requireNonNull(keys)); - return tbl.getAll(rows, (InternalTransaction) tx).thenApply(this::unmarshalPairs); + return convertToPublicFuture(tbl.getAll(rows, (InternalTransaction) tx).thenApply(this::unmarshalPairs)); } /** {@inheritDoc} */ @@ -143,7 +147,7 @@ public class KeyValueViewImpl<K, V> extends AbstractTableView implements KeyValu public CompletableFuture<Boolean> containsAsync(@Nullable Transaction tx, K key) { BinaryRowEx keyRow = marshal(Objects.requireNonNull(key)); - return tbl.get(keyRow, (InternalTransaction) tx).thenApply(Objects::nonNull); + return convertToPublicFuture(tbl.get(keyRow, (InternalTransaction) tx).thenApply(Objects::nonNull)); } /** {@inheritDoc} */ @@ -157,7 +161,7 @@ public class KeyValueViewImpl<K, V> extends AbstractTableView implements KeyValu public CompletableFuture<Void> putAsync(@Nullable Transaction tx, K key, V val) { BinaryRowEx row = marshal(Objects.requireNonNull(key), val); - return tbl.upsert(row, (InternalTransaction) tx); + return convertToPublicFuture(tbl.upsert(row, (InternalTransaction) tx)); } /** {@inheritDoc} */ @@ -171,7 +175,7 @@ public class KeyValueViewImpl<K, V> extends AbstractTableView implements KeyValu public CompletableFuture<Void> putAllAsync(@Nullable Transaction tx, Map<K, V> pairs) { Collection<BinaryRowEx> rows = marshalPairs(Objects.requireNonNull(pairs).entrySet()); - return tbl.upsertAll(rows, (InternalTransaction) tx); + return convertToPublicFuture(tbl.upsertAll(rows, (InternalTransaction) tx)); } /** {@inheritDoc} */ @@ -186,7 +190,7 @@ public class KeyValueViewImpl<K, V> extends AbstractTableView implements KeyValu Objects.requireNonNull(key); Objects.requireNonNull(val); - return tbl.getAndUpsert(marshal(key, val), (InternalTransaction) tx).thenApply(this::unmarshallValue); + return convertToPublicFuture(tbl.getAndUpsert(marshal(key, val), (InternalTransaction) tx).thenApply(this::unmarshallValue)); } /** {@inheritDoc} */ @@ -200,8 +204,8 @@ public class KeyValueViewImpl<K, V> extends AbstractTableView implements KeyValu public CompletableFuture<NullableValue<V>> getNullableAndPutAsync(@Nullable Transaction tx, K key, V val) { BinaryRowEx row = marshal(Objects.requireNonNull(key), val); - return tbl.getAndUpsert(row, (InternalTransaction) tx) - .thenApply(r -> r == null ? null : NullableValue.of(unmarshalNullableValue(r))); + return convertToPublicFuture(tbl.getAndUpsert(row, (InternalTransaction) tx) + .thenApply(r -> r == null ? null : NullableValue.of(unmarshalNullableValue(r)))); } /** {@inheritDoc} */ @@ -215,7 +219,7 @@ public class KeyValueViewImpl<K, V> extends AbstractTableView implements KeyValu public CompletableFuture<Boolean> putIfAbsentAsync(@Nullable Transaction tx, K key, V val) { BinaryRowEx row = marshal(Objects.requireNonNull(key), val); - return tbl.insert(row, (InternalTransaction) tx); + return convertToPublicFuture(tbl.insert(row, (InternalTransaction) tx)); } /** {@inheritDoc} */ @@ -235,7 +239,7 @@ public class KeyValueViewImpl<K, V> extends AbstractTableView implements KeyValu public CompletableFuture<Boolean> removeAsync(@Nullable Transaction tx, K key) { BinaryRowEx row = marshal(Objects.requireNonNull(key)); - return tbl.delete(row, (InternalTransaction) tx); + return convertToPublicFuture(tbl.delete(row, (InternalTransaction) tx)); } /** {@inheritDoc} */ @@ -243,7 +247,7 @@ public class KeyValueViewImpl<K, V> extends AbstractTableView implements KeyValu public CompletableFuture<Boolean> removeAsync(@Nullable Transaction tx, K key, V val) { BinaryRowEx row = marshal(Objects.requireNonNull(key), val); - return tbl.deleteExact(row, (InternalTransaction) tx); + return convertToPublicFuture(tbl.deleteExact(row, (InternalTransaction) tx)); } /** {@inheritDoc} */ @@ -257,7 +261,7 @@ public class KeyValueViewImpl<K, V> extends AbstractTableView implements KeyValu public CompletableFuture<Collection<K>> removeAllAsync(@Nullable Transaction tx, Collection<K> keys) { Collection<BinaryRowEx> rows = marshal(Objects.requireNonNull(keys)); - return tbl.deleteAll(rows, (InternalTransaction) tx).thenApply(this::unmarshalKeys); + return convertToPublicFuture(tbl.deleteAll(rows, (InternalTransaction) tx).thenApply(this::unmarshalKeys)); } /** {@inheritDoc} */ @@ -271,7 +275,7 @@ public class KeyValueViewImpl<K, V> extends AbstractTableView implements KeyValu public CompletableFuture<V> getAndRemoveAsync(@Nullable Transaction tx, K key) { BinaryRowEx keyRow = marshal(Objects.requireNonNull(key)); - return tbl.getAndDelete(keyRow, (InternalTransaction) tx).thenApply(this::unmarshallValue); + return convertToPublicFuture(tbl.getAndDelete(keyRow, (InternalTransaction) tx).thenApply(this::unmarshallValue)); } /** {@inheritDoc} */ @@ -285,8 +289,8 @@ public class KeyValueViewImpl<K, V> extends AbstractTableView implements KeyValu public CompletableFuture<NullableValue<V>> getNullableAndRemoveAsync(@Nullable Transaction tx, K key) { BinaryRowEx keyRow = marshal(Objects.requireNonNull(key)); - return tbl.getAndDelete(keyRow, (InternalTransaction) tx) - .thenApply(r -> r == null ? null : NullableValue.of(unmarshalNullableValue(r))); + return convertToPublicFuture(tbl.getAndDelete(keyRow, (InternalTransaction) tx) + .thenApply(r -> r == null ? null : NullableValue.of(unmarshalNullableValue(r)))); } /** {@inheritDoc} */ @@ -306,7 +310,7 @@ public class KeyValueViewImpl<K, V> extends AbstractTableView implements KeyValu public CompletableFuture<Boolean> replaceAsync(@Nullable Transaction tx, K key, V val) { BinaryRowEx row = marshal(Objects.requireNonNull(key), val); - return tbl.replace(row, (InternalTransaction) tx); + return convertToPublicFuture(tbl.replace(row, (InternalTransaction) tx)); } /** {@inheritDoc} */ @@ -317,7 +321,7 @@ public class KeyValueViewImpl<K, V> extends AbstractTableView implements KeyValu BinaryRowEx oldRow = marshal(key, oldVal); BinaryRowEx newRow = marshal(key, newVal); - return tbl.replace(oldRow, newRow, (InternalTransaction) tx); + return convertToPublicFuture(tbl.replace(oldRow, newRow, (InternalTransaction) tx)); } /** {@inheritDoc} */ @@ -332,7 +336,7 @@ public class KeyValueViewImpl<K, V> extends AbstractTableView implements KeyValu Objects.requireNonNull(key); Objects.requireNonNull(val); - return tbl.getAndReplace(marshal(key, val), (InternalTransaction) tx).thenApply(this::unmarshallValue); + return convertToPublicFuture(tbl.getAndReplace(marshal(key, val), (InternalTransaction) tx).thenApply(this::unmarshallValue)); } /** {@inheritDoc} */ @@ -346,8 +350,8 @@ public class KeyValueViewImpl<K, V> extends AbstractTableView implements KeyValu public CompletableFuture<NullableValue<V>> getNullableAndReplaceAsync(@Nullable Transaction tx, K key, V val) { BinaryRowEx row = marshal(Objects.requireNonNull(key), val); - return tbl.getAndReplace(row, (InternalTransaction) tx) - .thenApply(r -> r == null ? null : NullableValue.of(unmarshalNullableValue(r))); + return convertToPublicFuture(tbl.getAndReplace(row, (InternalTransaction) tx) + .thenApply(r -> r == null ? null : NullableValue.of(unmarshalNullableValue(r)))); } /** @@ -364,7 +368,8 @@ public class KeyValueViewImpl<K, V> extends AbstractTableView implements KeyValu // TODO: Cache marshaller for schema version or upgrade row? - return this.marsh = marshallerFactory.apply(schemaReg.schema(schemaVersion)); + SchemaRegistry registry = rowConverter.registry(); + return this.marsh = marshallerFactory.apply(registry.schema(schemaVersion)); } /** @@ -373,7 +378,8 @@ public class KeyValueViewImpl<K, V> extends AbstractTableView implements KeyValu * @return Marshaller. */ private KvMarshaller<K, V> marshaller() { - return marshaller(schemaReg.lastSchemaVersion()); + SchemaRegistry registry = rowConverter.registry(); + return marshaller(registry.lastSchemaVersion()); } /** @@ -477,7 +483,7 @@ public class KeyValueViewImpl<K, V> extends AbstractTableView implements KeyValu List<K> keys = new ArrayList<>(rows.size()); try { - for (Row row : schemaReg.resolveKeys(rows)) { + for (Row row : rowConverter.resolveKeys(rows)) { if (row != null) { keys.add(marsh.unmarshalKey(row)); } @@ -500,7 +506,7 @@ public class KeyValueViewImpl<K, V> extends AbstractTableView implements KeyValu return null; } - Row row = schemaReg.resolve(binaryRow); + Row row = rowConverter.resolveRow(binaryRow); KvMarshaller<K, V> marshaller = marshaller(row.schemaVersion()); @@ -527,7 +533,7 @@ public class KeyValueViewImpl<K, V> extends AbstractTableView implements KeyValu Map<K, V> pairs = IgniteUtils.newHashMap(rows.size()); try { - for (Row row : schemaReg.resolve(rows)) { + for (Row row : rowConverter.resolveRows(rows)) { if (row != null) { pairs.put(marsh.unmarshalKey(row), marsh.unmarshalValue(row)); } @@ -555,7 +561,7 @@ public class KeyValueViewImpl<K, V> extends AbstractTableView implements KeyValu V v = unmarshalNullableValue(binaryRow); if (v == null) { - throw new UnexpectedNullValueException("use `getNullable` sibling method instead."); + throw new UnexpectedNullValueException("Got unexpected null value: use `getNullable` sibling method instead."); } return v; @@ -566,8 +572,9 @@ public class KeyValueViewImpl<K, V> extends AbstractTableView implements KeyValu public CompletableFuture<Void> streamData(Publisher<Entry<K, V>> publisher, @Nullable DataStreamerOptions options) { Objects.requireNonNull(publisher); - var partitioner = new KeyValuePojoStreamerPartitionAwarenessProvider<>(schemaReg, tbl.partitions(), marshaller()); - StreamerBatchSender<Entry<K, V>, Integer> batchSender = (partitionId, items) -> tbl.upsertAll(marshalPairs(items), partitionId); + var partitioner = new KeyValuePojoStreamerPartitionAwarenessProvider<>(rowConverter.registry(), tbl.partitions(), marshaller()); + StreamerBatchSender<Entry<K, V>, Integer> batchSender = (partitionId, items) + -> convertToPublicFuture(this.tbl.upsertAll(marshalPairs(items), partitionId)); return DataStreamer.streamData(publisher, options, batchSender, partitioner); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java index 5414446c11..327bb45839 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.table; +import static org.apache.ignite.lang.IgniteExceptionMapperUtil.convertToPublicFuture; + import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -33,6 +35,7 @@ import org.apache.ignite.internal.schema.row.Row; import org.apache.ignite.internal.streamer.StreamerBatchSender; import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.lang.IgniteException; +import org.apache.ignite.lang.MarshallerException; import org.apache.ignite.table.DataStreamerOptions; import org.apache.ignite.table.RecordView; import org.apache.ignite.table.Tuple; @@ -71,7 +74,7 @@ public class RecordBinaryViewImpl extends AbstractTableView implements RecordVie final Row keyRow = marshal(keyRec, true); // Convert to portable format to pass TX/storage layer. - return tbl.get(keyRow, (InternalTransaction) tx).thenApply(this::wrap); + return convertToPublicFuture(tbl.get(keyRow, (InternalTransaction) tx).thenApply(this::wrap)); } @Override @@ -83,7 +86,8 @@ public class RecordBinaryViewImpl extends AbstractTableView implements RecordVie public CompletableFuture<List<Tuple>> getAllAsync(@Nullable Transaction tx, Collection<Tuple> keyRecs) { Objects.requireNonNull(keyRecs); - return tbl.getAll(mapToBinary(keyRecs, true), (InternalTransaction) tx).thenApply(binaryRows -> wrap(binaryRows, true)); + return convertToPublicFuture(tbl.getAll(mapToBinary(keyRecs, true), (InternalTransaction) tx) + .thenApply(binaryRows -> wrap(binaryRows, true))); } /** {@inheritDoc} */ @@ -99,7 +103,7 @@ public class RecordBinaryViewImpl extends AbstractTableView implements RecordVie final Row row = marshal(rec, false); - return tbl.upsert(row, (InternalTransaction) tx); + return convertToPublicFuture(tbl.upsert(row, (InternalTransaction) tx)); } /** {@inheritDoc} */ @@ -113,7 +117,7 @@ public class RecordBinaryViewImpl extends AbstractTableView implements RecordVie public CompletableFuture<Void> upsertAllAsync(@Nullable Transaction tx, Collection<Tuple> recs) { Objects.requireNonNull(recs); - return tbl.upsertAll(mapToBinary(recs, false), (InternalTransaction) tx); + return convertToPublicFuture(tbl.upsertAll(mapToBinary(recs, false), (InternalTransaction) tx)); } /** {@inheritDoc} */ @@ -129,7 +133,7 @@ public class RecordBinaryViewImpl extends AbstractTableView implements RecordVie final Row row = marshal(rec, false); - return tbl.getAndUpsert(row, (InternalTransaction) tx).thenApply(this::wrap); + return convertToPublicFuture(tbl.getAndUpsert(row, (InternalTransaction) tx).thenApply(this::wrap)); } /** {@inheritDoc} */ @@ -145,7 +149,7 @@ public class RecordBinaryViewImpl extends AbstractTableView implements RecordVie final Row row = marshal(rec, false); - return tbl.insert(row, (InternalTransaction) tx); + return convertToPublicFuture(tbl.insert(row, (InternalTransaction) tx)); } /** {@inheritDoc} */ @@ -159,7 +163,8 @@ public class RecordBinaryViewImpl extends AbstractTableView implements RecordVie public CompletableFuture<Collection<Tuple>> insertAllAsync(@Nullable Transaction tx, Collection<Tuple> recs) { Objects.requireNonNull(recs); - return tbl.insertAll(mapToBinary(recs, false), (InternalTransaction) tx).thenApply(rows -> wrap(rows, false)); + return convertToPublicFuture(tbl.insertAll(mapToBinary(recs, false), (InternalTransaction) tx) + .thenApply(rows -> wrap(rows, false))); } /** {@inheritDoc} */ @@ -181,7 +186,7 @@ public class RecordBinaryViewImpl extends AbstractTableView implements RecordVie final Row row = marshal(rec, false); - return tbl.replace(row, (InternalTransaction) tx); + return convertToPublicFuture(tbl.replace(row, (InternalTransaction) tx)); } /** {@inheritDoc} */ @@ -193,7 +198,7 @@ public class RecordBinaryViewImpl extends AbstractTableView implements RecordVie final Row oldRow = marshal(oldRec, false); final Row newRow = marshal(newRec, false); - return tbl.replace(oldRow, newRow, (InternalTransaction) tx); + return convertToPublicFuture(tbl.replace(oldRow, newRow, (InternalTransaction) tx)); } /** {@inheritDoc} */ @@ -209,7 +214,7 @@ public class RecordBinaryViewImpl extends AbstractTableView implements RecordVie final Row row = marshal(rec, false); - return tbl.getAndReplace(row, (InternalTransaction) tx).thenApply(this::wrap); + return convertToPublicFuture(tbl.getAndReplace(row, (InternalTransaction) tx).thenApply(this::wrap)); } /** {@inheritDoc} */ @@ -225,7 +230,7 @@ public class RecordBinaryViewImpl extends AbstractTableView implements RecordVie final Row keyRow = marshal(keyRec, true); - return tbl.delete(keyRow, (InternalTransaction) tx); + return convertToPublicFuture(tbl.delete(keyRow, (InternalTransaction) tx)); } /** {@inheritDoc} */ @@ -241,7 +246,7 @@ public class RecordBinaryViewImpl extends AbstractTableView implements RecordVie final Row row = marshal(rec, false); - return tbl.deleteExact(row, (InternalTransaction) tx); + return convertToPublicFuture(tbl.deleteExact(row, (InternalTransaction) tx)); } /** {@inheritDoc} */ @@ -257,7 +262,7 @@ public class RecordBinaryViewImpl extends AbstractTableView implements RecordVie final Row keyRow = marshal(keyRec, true); - return tbl.getAndDelete(keyRow, (InternalTransaction) tx).thenApply(this::wrap); + return convertToPublicFuture(tbl.getAndDelete(keyRow, (InternalTransaction) tx).thenApply(this::wrap)); } /** {@inheritDoc} */ @@ -271,7 +276,7 @@ public class RecordBinaryViewImpl extends AbstractTableView implements RecordVie public CompletableFuture<Collection<Tuple>> deleteAllAsync(@Nullable Transaction tx, Collection<Tuple> keyRecs) { Objects.requireNonNull(keyRecs); - return tbl.deleteAll(mapToBinary(keyRecs, true), (InternalTransaction) tx).thenApply(this::wrapKeys); + return convertToPublicFuture(tbl.deleteAll(mapToBinary(keyRecs, true), (InternalTransaction) tx).thenApply(this::wrapKeys)); } /** {@inheritDoc} */ @@ -285,7 +290,8 @@ public class RecordBinaryViewImpl extends AbstractTableView implements RecordVie public CompletableFuture<Collection<Tuple>> deleteAllExactAsync(@Nullable Transaction tx, Collection<Tuple> recs) { Objects.requireNonNull(recs); - return tbl.deleteAllExact(mapToBinary(recs, false), (InternalTransaction) tx).thenApply(rows -> wrap(rows, false)); + return convertToPublicFuture(tbl.deleteAllExact(mapToBinary(recs, false), (InternalTransaction) tx) + .thenApply(rows -> wrap(rows, false))); } /** @@ -304,7 +310,7 @@ public class RecordBinaryViewImpl extends AbstractTableView implements RecordVie return marsh.marshal(tuple); } } catch (TupleMarshallerException ex) { - throw convertException(ex); + throw new MarshallerException(ex); } } @@ -314,7 +320,7 @@ public class RecordBinaryViewImpl extends AbstractTableView implements RecordVie * @param row Binary row. */ private @Nullable Tuple wrap(@Nullable BinaryRow row) { - return row == null ? null : TableRow.tuple(schemaReg.resolve(row)); + return row == null ? null : TableRow.tuple(rowConverter.resolveRow(row)); } /** @@ -330,7 +336,7 @@ public class RecordBinaryViewImpl extends AbstractTableView implements RecordVie var wrapped = new ArrayList<Tuple>(rows.size()); - for (Row row : schemaReg.resolve(rows)) { + for (Row row : rowConverter.resolveRows(rows)) { if (row != null) { wrapped.add(TableRow.tuple(row)); } else if (addNull) { @@ -348,7 +354,7 @@ public class RecordBinaryViewImpl extends AbstractTableView implements RecordVie var wrapped = new ArrayList<Tuple>(rows.size()); - for (Row row : schemaReg.resolveKeys(rows)) { + for (Row row : rowConverter.resolveKeys(rows)) { if (row != null) { wrapped.add(TableRow.tuple(row)); } @@ -379,8 +385,9 @@ public class RecordBinaryViewImpl extends AbstractTableView implements RecordVie public CompletableFuture<Void> streamData(Publisher<Tuple> publisher, @Nullable DataStreamerOptions options) { Objects.requireNonNull(publisher); - var partitioner = new TupleStreamerPartitionAwarenessProvider(schemaReg, tbl.partitions()); - StreamerBatchSender<Tuple, Integer> batchSender = (partitionId, items) -> tbl.upsertAll(mapToBinary(items, false), partitionId); + var partitioner = new TupleStreamerPartitionAwarenessProvider(rowConverter.registry(), tbl.partitions()); + StreamerBatchSender<Tuple, Integer> batchSender = (partitionId, items) -> + convertToPublicFuture(this.tbl.upsertAll(mapToBinary(items, false), partitionId)); return DataStreamer.streamData(publisher, options, batchSender, partitioner); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java index eeebd2e02e..5229eaa2c4 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.table; +import static org.apache.ignite.lang.IgniteExceptionMapperUtil.convertToPublicFuture; + import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -35,7 +37,6 @@ import org.apache.ignite.internal.schema.marshaller.reflection.RecordMarshallerI import org.apache.ignite.internal.schema.row.Row; import org.apache.ignite.internal.streamer.StreamerBatchSender; import org.apache.ignite.internal.tx.InternalTransaction; -import org.apache.ignite.lang.IgniteException; import org.apache.ignite.table.DataStreamerOptions; import org.apache.ignite.table.RecordView; import org.apache.ignite.table.mapper.Mapper; @@ -76,7 +77,7 @@ public class RecordViewImpl<R> extends AbstractTableView implements RecordView<R public CompletableFuture<R> getAsync(@Nullable Transaction tx, R keyRec) { BinaryRowEx keyRow = marshalKey(Objects.requireNonNull(keyRec)); - return tbl.get(keyRow, (InternalTransaction) tx).thenApply(this::unmarshal); + return convertToPublicFuture(tbl.get(keyRow, (InternalTransaction) tx).thenApply(this::unmarshal)); } @Override @@ -88,7 +89,8 @@ public class RecordViewImpl<R> extends AbstractTableView implements RecordView<R public CompletableFuture<List<R>> getAllAsync(@Nullable Transaction tx, Collection<R> keyRecs) { Objects.requireNonNull(keyRecs); - return tbl.getAll(marshalKeys(keyRecs), (InternalTransaction) tx).thenApply(binaryRows -> unmarshal(binaryRows, true)); + return convertToPublicFuture(tbl.getAll(marshalKeys(keyRecs), (InternalTransaction) tx) + .thenApply(binaryRows -> unmarshal(binaryRows, true))); } /** {@inheritDoc} */ @@ -102,7 +104,7 @@ public class RecordViewImpl<R> extends AbstractTableView implements RecordView<R public CompletableFuture<Void> upsertAsync(@Nullable Transaction tx, R rec) { BinaryRowEx keyRow = marshal(Objects.requireNonNull(rec)); - return tbl.upsert(keyRow, (InternalTransaction) tx); + return convertToPublicFuture(tbl.upsert(keyRow, (InternalTransaction) tx)); } /** {@inheritDoc} */ @@ -116,7 +118,7 @@ public class RecordViewImpl<R> extends AbstractTableView implements RecordView<R public CompletableFuture<Void> upsertAllAsync(@Nullable Transaction tx, Collection<R> recs) { Objects.requireNonNull(recs); - return tbl.upsertAll(marshal(recs), (InternalTransaction) tx); + return convertToPublicFuture(tbl.upsertAll(marshal(recs), (InternalTransaction) tx)); } /** {@inheritDoc} */ @@ -130,7 +132,7 @@ public class RecordViewImpl<R> extends AbstractTableView implements RecordView<R public CompletableFuture<R> getAndUpsertAsync(@Nullable Transaction tx, R rec) { BinaryRowEx keyRow = marshal(Objects.requireNonNull(rec)); - return tbl.getAndUpsert(keyRow, (InternalTransaction) tx).thenApply(this::unmarshal); + return convertToPublicFuture(tbl.getAndUpsert(keyRow, (InternalTransaction) tx).thenApply(this::unmarshal)); } /** {@inheritDoc} */ @@ -144,7 +146,7 @@ public class RecordViewImpl<R> extends AbstractTableView implements RecordView<R public CompletableFuture<Boolean> insertAsync(@Nullable Transaction tx, R rec) { BinaryRowEx keyRow = marshal(Objects.requireNonNull(rec)); - return tbl.insert(keyRow, (InternalTransaction) tx); + return convertToPublicFuture(tbl.insert(keyRow, (InternalTransaction) tx)); } /** {@inheritDoc} */ @@ -158,7 +160,7 @@ public class RecordViewImpl<R> extends AbstractTableView implements RecordView<R public CompletableFuture<Collection<R>> insertAllAsync(@Nullable Transaction tx, Collection<R> recs) { Collection<BinaryRowEx> rows = marshal(Objects.requireNonNull(recs)); - return tbl.insertAll(rows, (InternalTransaction) tx).thenApply(binaryRows -> unmarshal(binaryRows, false)); + return convertToPublicFuture(tbl.insertAll(rows, (InternalTransaction) tx).thenApply(binaryRows -> unmarshal(binaryRows, false))); } /** {@inheritDoc} */ @@ -178,7 +180,7 @@ public class RecordViewImpl<R> extends AbstractTableView implements RecordView<R public CompletableFuture<Boolean> replaceAsync(@Nullable Transaction tx, R rec) { BinaryRowEx newRow = marshal(Objects.requireNonNull(rec)); - return tbl.replace(newRow, (InternalTransaction) tx); + return convertToPublicFuture(tbl.replace(newRow, (InternalTransaction) tx)); } /** {@inheritDoc} */ @@ -187,7 +189,7 @@ public class RecordViewImpl<R> extends AbstractTableView implements RecordView<R BinaryRowEx oldRow = marshal(Objects.requireNonNull(oldRec)); BinaryRowEx newRow = marshal(Objects.requireNonNull(newRec)); - return tbl.replace(oldRow, newRow, (InternalTransaction) tx); + return convertToPublicFuture(tbl.replace(oldRow, newRow, (InternalTransaction) tx)); } /** {@inheritDoc} */ @@ -201,7 +203,7 @@ public class RecordViewImpl<R> extends AbstractTableView implements RecordView<R public CompletableFuture<R> getAndReplaceAsync(@Nullable Transaction tx, R rec) { BinaryRowEx row = marshal(Objects.requireNonNull(rec)); - return tbl.getAndReplace(row, (InternalTransaction) tx).thenApply(this::unmarshal); + return convertToPublicFuture(tbl.getAndReplace(row, (InternalTransaction) tx).thenApply(this::unmarshal)); } /** {@inheritDoc} */ @@ -215,7 +217,7 @@ public class RecordViewImpl<R> extends AbstractTableView implements RecordView<R public CompletableFuture<Boolean> deleteAsync(@Nullable Transaction tx, R keyRec) { BinaryRowEx row = marshalKey(Objects.requireNonNull(keyRec)); - return tbl.delete(row, (InternalTransaction) tx); + return convertToPublicFuture(tbl.delete(row, (InternalTransaction) tx)); } /** {@inheritDoc} */ @@ -229,7 +231,7 @@ public class RecordViewImpl<R> extends AbstractTableView implements RecordView<R public CompletableFuture<Boolean> deleteExactAsync(@Nullable Transaction tx, R keyRec) { BinaryRowEx row = marshal(Objects.requireNonNull(keyRec)); - return tbl.deleteExact(row, (InternalTransaction) tx); + return convertToPublicFuture(tbl.deleteExact(row, (InternalTransaction) tx)); } /** {@inheritDoc} */ @@ -243,7 +245,7 @@ public class RecordViewImpl<R> extends AbstractTableView implements RecordView<R public CompletableFuture<R> getAndDeleteAsync(@Nullable Transaction tx, R keyRec) { BinaryRowEx row = marshalKey(keyRec); - return tbl.getAndDelete(row, (InternalTransaction) tx).thenApply(this::unmarshal); + return convertToPublicFuture(tbl.getAndDelete(row, (InternalTransaction) tx).thenApply(this::unmarshal)); } /** {@inheritDoc} */ @@ -257,7 +259,7 @@ public class RecordViewImpl<R> extends AbstractTableView implements RecordView<R public CompletableFuture<Collection<R>> deleteAllAsync(@Nullable Transaction tx, Collection<R> keyRecs) { Collection<BinaryRowEx> rows = marshal(Objects.requireNonNull(keyRecs)); - return tbl.deleteAll(rows, (InternalTransaction) tx).thenApply(binaryRows -> unmarshal(binaryRows, false)); + return convertToPublicFuture(tbl.deleteAll(rows, (InternalTransaction) tx).thenApply(binaryRows -> unmarshal(binaryRows, false))); } /** {@inheritDoc} */ @@ -271,7 +273,8 @@ public class RecordViewImpl<R> extends AbstractTableView implements RecordView<R public CompletableFuture<Collection<R>> deleteAllExactAsync(@Nullable Transaction tx, Collection<R> keyRecs) { Collection<BinaryRowEx> rows = marshal(Objects.requireNonNull(keyRecs)); - return tbl.deleteAllExact(rows, (InternalTransaction) tx).thenApply(binaryRows -> unmarshal(binaryRows, false)); + return convertToPublicFuture(tbl.deleteAllExact(rows, (InternalTransaction) tx) + .thenApply(binaryRows -> unmarshal(binaryRows, false))); } /** @@ -289,7 +292,8 @@ public class RecordViewImpl<R> extends AbstractTableView implements RecordView<R // TODO: Cache marshaller for schema version or upgrade row? - return this.marsh = marshallerFactory.apply(schemaReg.schema(schemaVersion)); + SchemaDescriptor schema = rowConverter.registry().schema(schemaVersion); + return this.marsh = marshallerFactory.apply(schema); } /** @@ -298,6 +302,7 @@ public class RecordViewImpl<R> extends AbstractTableView implements RecordView<R * @return Marshaller. */ private RecordMarshaller<R> marshaller() { + SchemaRegistry schemaReg = rowConverter.registry(); return marshaller(schemaReg.lastSchemaVersion()); } @@ -313,7 +318,7 @@ public class RecordViewImpl<R> extends AbstractTableView implements RecordView<R try { return marsh.marshal(rec); } catch (MarshallerException e) { - throw new IgniteException(e); + throw new org.apache.ignite.lang.MarshallerException(e); } } @@ -337,7 +342,7 @@ public class RecordViewImpl<R> extends AbstractTableView implements RecordView<R return rows; } catch (MarshallerException e) { - throw new IgniteException(e); + throw new org.apache.ignite.lang.MarshallerException(e); } } @@ -353,7 +358,7 @@ public class RecordViewImpl<R> extends AbstractTableView implements RecordView<R try { return marsh.marshalKey(rec); } catch (MarshallerException e) { - throw new IgniteException(e); + throw new org.apache.ignite.lang.MarshallerException(e); } } @@ -377,7 +382,7 @@ public class RecordViewImpl<R> extends AbstractTableView implements RecordView<R return rows; } catch (MarshallerException e) { - throw new IgniteException(e); + throw new org.apache.ignite.lang.MarshallerException(e); } } @@ -392,14 +397,14 @@ public class RecordViewImpl<R> extends AbstractTableView implements RecordView<R return null; } - Row row = schemaReg.resolve(binaryRow); + Row row = rowConverter.resolveRow(binaryRow); RecordMarshaller<R> marshaller = marshaller(row.schemaVersion()); try { return marshaller.unmarshal(row); } catch (MarshallerException e) { - throw new IgniteException(e); + throw new org.apache.ignite.lang.MarshallerException(e); } } @@ -420,7 +425,7 @@ public class RecordViewImpl<R> extends AbstractTableView implements RecordView<R var recs = new ArrayList<R>(rows.size()); try { - for (Row row : schemaReg.resolve(rows)) { + for (Row row : rowConverter.resolveRows(rows)) { if (row != null) { recs.add(marsh.unmarshal(row)); } else if (addNull) { @@ -430,7 +435,7 @@ public class RecordViewImpl<R> extends AbstractTableView implements RecordView<R return recs; } catch (MarshallerException e) { - throw new IgniteException(e); + throw new org.apache.ignite.lang.MarshallerException(e); } } @@ -439,8 +444,8 @@ public class RecordViewImpl<R> extends AbstractTableView implements RecordView<R public CompletableFuture<Void> streamData(Publisher<R> publisher, @Nullable DataStreamerOptions options) { Objects.requireNonNull(publisher); - var partitioner = new PojoStreamerPartitionAwarenessProvider<>(schemaReg, tbl.partitions(), marshaller()); - StreamerBatchSender<R, Integer> batchSender = (partitionId, items) -> tbl.upsertAll(marshal(items), partitionId); + var partitioner = new PojoStreamerPartitionAwarenessProvider<>(rowConverter.registry(), tbl.partitions(), marshaller()); + StreamerBatchSender<R, Integer> batchSender = (partitionId, items) -> this.tbl.upsertAll(marshal(items), partitionId); return DataStreamer.streamData(publisher, options, batchSender, partitioner); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/TableViewRowConverter.java b/modules/table/src/main/java/org/apache/ignite/internal/table/TableViewRowConverter.java new file mode 100644 index 0000000000..a81aabdfe0 --- /dev/null +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/TableViewRowConverter.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table; + +import java.util.Collection; +import java.util.List; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.schema.SchemaRegistry; +import org.apache.ignite.internal.schema.registry.SchemaRegistryException; +import org.apache.ignite.internal.schema.row.Row; +import org.apache.ignite.lang.MarshallerException; + +/** + * Converts {@link BinaryRow binary rows} to {@link Row rows} using {@link SchemaRegistry}. + */ +final class TableViewRowConverter { + + private final SchemaRegistry schemaReg; + + TableViewRowConverter(SchemaRegistry schemaReg) { + this.schemaReg = schemaReg; + } + + SchemaRegistry registry() { + return schemaReg; + } + + Row resolveRow(BinaryRow binaryRow) { + try { + return schemaReg.resolve(binaryRow); + } catch (SchemaRegistryException e) { + throw new MarshallerException(e); + } + } + + List<Row> resolveKeys(Collection<BinaryRow> rows) { + try { + return schemaReg.resolveKeys(rows); + } catch (SchemaRegistryException e) { + throw new MarshallerException(e); + } + } + + List<Row> resolveRows(Collection<BinaryRow> rows) { + try { + return schemaReg.resolve(rows); + } catch (SchemaRegistryException e) { + throw new MarshallerException(e); + } + } +}