This is an automated email from the ASF dual-hosted git repository. korlov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 405db0c3d8 IGNITE-21506 Refactoring of tuple marshaller (#3195) 405db0c3d8 is described below commit 405db0c3d8be72f7833cac993b96f42aa3628e1c Author: korlov42 <kor...@gridgain.com> AuthorDate: Mon Feb 12 14:55:00 2024 +0200 IGNITE-21506 Refactoring of tuple marshaller (#3195) --- .../internal/table/ItReadOnlyTransactionTest.java | 2 +- .../ignite/internal/table/ItTableScanTest.java | 4 +- .../ignite/internal/schema/SchemaDescriptor.java | 18 +- .../marshaller/reflection/ObjectStatistics.java | 3 +- .../ignite/internal/schema/row/RowAssembler.java | 77 ++---- .../org/apache/ignite/internal/schema/RowTest.java | 2 +- .../schema/registry/UpgradingRowAdapterTest.java | 2 +- .../ItAbstractInternalTableScanTest.java | 2 +- .../ItInternalTableReadOnlyOperationsTest.java | 2 +- .../ignite/distributed/ReplicaUnavailableTest.java | 2 +- .../schema/marshaller/TupleMarshallerImpl.java | 286 ++++++--------------- .../table/ColocationHashCalculationTest.java | 2 +- .../raft/PartitionCommandListenerTest.java | 2 +- .../incoming/IncomingSnapshotCopierTest.java | 2 +- 14 files changed, 127 insertions(+), 279 deletions(-) diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReadOnlyTransactionTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReadOnlyTransactionTest.java index 29beb2810e..27283558a9 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReadOnlyTransactionTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReadOnlyTransactionTest.java @@ -177,7 +177,7 @@ public class ItReadOnlyTransactionTest extends ClusterPerClassIntegrationTest { } private static Row createRow(SchemaDescriptor schema, int id) { - RowAssembler rowBuilder = new RowAssembler(schema); + RowAssembler rowBuilder = new RowAssembler(schema, -1); rowBuilder.appendInt(id); rowBuilder.appendString("new str " + id); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java index 81459bed3c..f600f80b4e 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java @@ -955,7 +955,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest { * @return Entire row. */ private Row createKeyValueRow(int id) { - RowAssembler rowBuilder = new RowAssembler(schema); + RowAssembler rowBuilder = new RowAssembler(schema, -1); rowBuilder.appendInt(id); rowBuilder.appendInt(id); @@ -971,7 +971,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest { * @return Entire row. */ private Row createOldKeyValueRow(int id) { - RowAssembler rowBuilder = new RowAssembler(schema); + RowAssembler rowBuilder = new RowAssembler(schema, -1); rowBuilder.appendInt(id); rowBuilder.appendInt(id); diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaDescriptor.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaDescriptor.java index 845af4ffb3..4de80f6c8d 100644 --- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaDescriptor.java +++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaDescriptor.java @@ -23,9 +23,11 @@ import java.util.Arrays; import java.util.Collection; import java.util.Comparator; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.ignite.internal.marshaller.MarshallerColumn; import org.apache.ignite.internal.marshaller.MarshallerSchema; @@ -53,6 +55,8 @@ public class SchemaDescriptor { /** Colocation columns. */ private final Column[] colocationCols; + private final List<Column> columns; + /** Colocation columns. */ private final @Nullable Map<Column, Integer> colocationColIndexes; @@ -96,6 +100,11 @@ public class SchemaDescriptor { this.keyCols = new Columns(0, keyCols); this.valCols = new Columns(keyCols.length, valCols); + this.columns = Stream.concat( + Arrays.stream(this.keyCols.columns()), + Arrays.stream(this.valCols.columns()) + ).collect(Collectors.toList()); + assert this.keyCols.nullMapSize() == 0 : "Primary key cannot contain nullable column [cols=" + this.keyCols + ']'; colMap = newLinkedHashMap(keyCols.length + valCols.length); @@ -162,7 +171,7 @@ public class SchemaDescriptor { public Column column(int colIdx) { validateColumnIndex(colIdx); - return colIdx < keyCols.length() ? keyCols.column(colIdx) : valCols.column(colIdx - keyCols.length()); + return columns.get(colIdx); } /** @@ -175,6 +184,11 @@ public class SchemaDescriptor { return colMap.get(name); } + /** Returns columns in the order their appear in serialized tuple. */ + public List<Column> columns() { + return columns; + } + /** * Validates the column index. * @@ -243,7 +257,7 @@ public class SchemaDescriptor { * @return Total number of columns in schema. */ public int length() { - return keyCols.length() + valCols.length(); + return columns.size(); } /** diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/ObjectStatistics.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/ObjectStatistics.java index 36040a9009..746341a0c8 100644 --- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/ObjectStatistics.java +++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/ObjectStatistics.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.schema.marshaller.reflection; import static org.apache.ignite.internal.schema.marshaller.MarshallerUtil.getValueSize; +import java.util.List; import org.apache.ignite.internal.marshaller.Marshaller; import org.apache.ignite.internal.schema.Column; import org.apache.ignite.internal.schema.Columns; @@ -82,7 +83,7 @@ class ObjectStatistics { int totalValueSize = keyStat.getEstimatedValueSize(); - return new RowAssembler(schema.keyColumns(), null, schema.version(), totalValueSize); + return new RowAssembler(schema.version(), List.of(schema.keyColumns().columns()), totalValueSize); } static RowAssembler createAssembler( diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/RowAssembler.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/RowAssembler.java index 5e25799e48..e28fe38100 100644 --- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/RowAssembler.java +++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/RowAssembler.java @@ -25,13 +25,13 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; import java.util.BitSet; +import java.util.List; import java.util.UUID; import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder; import org.apache.ignite.internal.schema.AssemblyException; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.BinaryRowImpl; import org.apache.ignite.internal.schema.Column; -import org.apache.ignite.internal.schema.Columns; import org.apache.ignite.internal.schema.InvalidTypeException; import org.apache.ignite.internal.schema.SchemaDescriptor; import org.apache.ignite.internal.schema.SchemaMismatchException; @@ -55,33 +55,13 @@ import org.jetbrains.annotations.Nullable; * <p>Natively supported temporal types are encoded automatically with preserving sort order before writing. */ public class RowAssembler { - /** Key columns. */ - private final Columns keyColumns; - - /** Value columns. */ - private final Columns valueColumns; - - /** Schema version. */ private final int schemaVersion; - - /** Binary tuple builder. */ + private final List<Column> columns; private final BinaryTupleBuilder builder; - /** Current columns chunk. */ - private Columns curCols; - /** Current field index (the field is unset). */ private int curCol; - /** - * Creates a builder. - * - * @param schema Schema descriptor. - */ - public RowAssembler(SchemaDescriptor schema) { - this(schema, -1); - } - /** * Creates a builder. * @@ -89,24 +69,21 @@ public class RowAssembler { * @param totalValueSize Total estimated length of non-NULL values, -1 if not known. */ public RowAssembler(SchemaDescriptor schema, int totalValueSize) { - this(schema.keyColumns(), schema.valueColumns(), schema.version(), totalValueSize); + this(schema.version(), schema.columns(), totalValueSize); } /** - * Creates a builder. + * Create a builder. * - * @param keyColumns Key columns. - * @param valueColumns Value columns, {@code null} if only key should be assembled. - * @param schemaVersion Schema version. + * @param schemaVersion Version of the schema. + * @param columns List of columns to serialize. Values must be appended in the same order. * @param totalValueSize Total estimated length of non-NULL values, -1 if not known. */ - public RowAssembler(Columns keyColumns, @Nullable Columns valueColumns, int schemaVersion, int totalValueSize) { - this.keyColumns = keyColumns; - this.valueColumns = valueColumns; + public RowAssembler(int schemaVersion, List<Column> columns, int totalValueSize) { this.schemaVersion = schemaVersion; - int numElements = keyColumns.length() + (valueColumns != null ? valueColumns.length() : 0); - builder = new BinaryTupleBuilder(numElements, totalValueSize); - curCols = keyColumns; + this.columns = columns; + + builder = new BinaryTupleBuilder(columns.size(), totalValueSize); curCol = 0; } @@ -121,7 +98,7 @@ public class RowAssembler { return appendNull(); } - NativeType columnType = curCols.column(curCol).type(); + NativeType columnType = columns.get(curCol).type(); switch (columnType.spec()) { case BOOLEAN: { @@ -187,9 +164,9 @@ public class RowAssembler { * @throws SchemaMismatchException If the current column is not nullable. */ public RowAssembler appendNull() throws SchemaMismatchException { - if (!curCols.column(curCol).nullable()) { + if (!columns.get(curCol).nullable()) { throw new SchemaMismatchException( - "Failed to set column (null was passed, but column is not nullable): " + curCols.column(curCol)); + "Failed to set column (null was passed, but column is not nullable): " + columns.get(curCol)); } builder.appendNull(); @@ -205,7 +182,7 @@ public class RowAssembler { * @return {@code this} for chaining. */ public RowAssembler appendDefault() { - Column column = curCols.column(curCol); + Column column = columns.get(curCol); return appendValue(column.defaultValue()); } @@ -367,7 +344,7 @@ public class RowAssembler { public RowAssembler appendNumberNotNull(BigInteger val) throws SchemaMismatchException { checkType(NativeTypeSpec.NUMBER); - Column col = curCols.column(curCol); + Column col = columns.get(curCol); NumberNativeType type = (NumberNativeType) col.type(); @@ -399,7 +376,7 @@ public class RowAssembler { public RowAssembler appendDecimalNotNull(BigDecimal val) throws SchemaMismatchException { checkType(NativeTypeSpec.DECIMAL); - Column col = curCols.column(curCol); + Column col = columns.get(curCol); DecimalNativeType type = (DecimalNativeType) col.type(); @@ -491,7 +468,7 @@ public class RowAssembler { * @throws SchemaMismatchException If a value doesn't match the current column type. */ public RowAssembler appendBitmaskNotNull(BitSet bitSet) throws SchemaMismatchException { - Column col = curCols.column(curCol); + Column col = columns.get(curCol); checkType(NativeTypeSpec.BITMASK); @@ -603,7 +580,7 @@ public class RowAssembler { } private int normalizeNanos(int nanos) { - NativeType type = curCols.column(curCol).type(); + NativeType type = columns.get(curCol).type(); return TemporalTypeUtils.normalizeNanos(nanos, ((TemporalNativeType) type).precision()); } @@ -614,9 +591,7 @@ public class RowAssembler { * @return Created {@link BinaryRow}. */ public BinaryRow build() { - if (keyColumns == curCols) { - throw new AssemblyException("Key column missed: colIdx=" + curCol); - } else if (curCol != 0 && valueColumns.length() != curCol) { + if (curCol != 0 && columns.size() != curCol) { throw new AssemblyException("Value column missed: colIdx=" + curCol); } @@ -630,11 +605,7 @@ public class RowAssembler { * @throws SchemaMismatchException If given type doesn't match the current column type. */ private void checkType(NativeTypeSpec type) { - if (curCols == null) { - throw new SchemaMismatchException("Failed to set column, expected key only but tried to add " + type.name()); - } - - Column col = curCols.column(curCol); + Column col = columns.get(curCol); // Column#validate does not work here, because we must tolerate differences in precision, size, etc. if (col.type().spec() != type) { @@ -659,12 +630,6 @@ public class RowAssembler { */ private void shiftColumn() { curCol++; - - if (curCol == curCols.length() && curCols == keyColumns) { - // Switch key->value columns. - curCols = valueColumns; - curCol = 0; - } } /** @@ -674,6 +639,6 @@ public class RowAssembler { * @return Created assembler. */ public static RowAssembler keyAssembler(SchemaDescriptor schema) { - return new RowAssembler(schema.keyColumns(), null, schema.version(), -1); + return new RowAssembler(schema.version(), List.of(schema.keyColumns().columns()), -1); } } diff --git a/modules/schema/src/test/java/org/apache/ignite/internal/schema/RowTest.java b/modules/schema/src/test/java/org/apache/ignite/internal/schema/RowTest.java index 7aa749863f..1d57e237f3 100644 --- a/modules/schema/src/test/java/org/apache/ignite/internal/schema/RowTest.java +++ b/modules/schema/src/test/java/org/apache/ignite/internal/schema/RowTest.java @@ -580,7 +580,7 @@ public class RowTest { private static void checkValues(SchemaDescriptor schema, Object... vals) { assertEquals(schema.keyColumns().length() + schema.valueColumns().length(), vals.length); - RowAssembler asm = new RowAssembler(schema); + RowAssembler asm = new RowAssembler(schema, -1); for (Object val : vals) { asm.appendValue(val); diff --git a/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapterTest.java b/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapterTest.java index 0d048ab912..0412b62d8d 100644 --- a/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapterTest.java +++ b/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapterTest.java @@ -204,7 +204,7 @@ public class UpgradingRowAdapterTest { private static BinaryRow serializeValuesToRow(SchemaDescriptor schema, List<Object> vals) { assertEquals(schema.keyColumns().length() + schema.valueColumns().length(), vals.size()); - RowAssembler asm = new RowAssembler(schema); + RowAssembler asm = new RowAssembler(schema, -1); for (Object val : vals) { asm.appendValue(val); diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java index ec1d9200bb..714e7a05f6 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java @@ -383,7 +383,7 @@ public abstract class ItAbstractInternalTableScanTest extends IgniteAbstractTest * @return {@link BinaryRow} based on given key and value. */ private static BinaryRow prepareRow(String entryKey, String entryVal) { - return new RowAssembler(ROW_SCHEMA) + return new RowAssembler(ROW_SCHEMA, -1) .appendString(Objects.requireNonNull(entryKey, "entryKey")) .appendString(Objects.requireNonNull(entryVal, "entryVal")) .build(); diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java index a9ff0fb883..b66c0336d3 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java @@ -304,7 +304,7 @@ public class ItInternalTableReadOnlyOperationsTest extends IgniteAbstractTest { * @return Row. */ private static Row createKeyValueRow(long id, long value) { - RowAssembler rowBuilder = new RowAssembler(SCHEMA); + RowAssembler rowBuilder = new RowAssembler(SCHEMA, -1); rowBuilder.appendLong(id); rowBuilder.appendLong(value); diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java index f9ec44d211..0d625a9509 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java @@ -314,7 +314,7 @@ public class ReplicaUnavailableTest extends IgniteAbstractTest { } private static BinaryRow createKeyValueRow(long id, long value) { - RowAssembler rowBuilder = new RowAssembler(SCHEMA); + RowAssembler rowBuilder = new RowAssembler(SCHEMA, -1); rowBuilder.appendLong(id); rowBuilder.appendLong(value); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/TupleMarshallerImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/TupleMarshallerImpl.java index fa04671951..87b8e7c3c7 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/TupleMarshallerImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/TupleMarshallerImpl.java @@ -21,14 +21,13 @@ import static org.apache.ignite.internal.schema.marshaller.MarshallerUtil.getVal import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; import org.apache.ignite.internal.binarytuple.BinaryTupleContainer; import org.apache.ignite.internal.binarytuple.BinaryTupleReader; import org.apache.ignite.internal.schema.BinaryRowImpl; import org.apache.ignite.internal.schema.Column; -import org.apache.ignite.internal.schema.Columns; import org.apache.ignite.internal.schema.SchemaAware; import org.apache.ignite.internal.schema.SchemaDescriptor; import org.apache.ignite.internal.schema.SchemaMismatchException; @@ -43,14 +42,12 @@ import org.jetbrains.annotations.Nullable; * Tuple marshaller implementation. */ public class TupleMarshallerImpl implements TupleMarshaller { - /** Poison object. */ private static final Object POISON_OBJECT = new Object(); - /** Schema manager. */ private final SchemaDescriptor schema; /** - * Creates tuple marshaller. + * Creates marshaller for given schema. * * @param schema Schema. */ @@ -86,16 +83,17 @@ public class TupleMarshallerImpl implements TupleMarshaller { } } - InternalTuple keyTuple0 = toInternalTuple(schema, tuple, true); - InternalTuple valTuple0 = toInternalTuple(schema, tuple, false); + ValuesWithStatistics valuesWithStatistics = new ValuesWithStatistics(); - if (valTuple0.knownColumns() + keyTuple0.knownColumns() != tuple.columnCount()) { + gatherStatistics(schema.columns(), tuple, valuesWithStatistics); + + if (valuesWithStatistics.knownColumns != tuple.columnCount()) { throw new SchemaMismatchException( String.format("Tuple doesn't match schema: schemaVersion=%s, extraColumns=%s", schema.version(), extraColumnNames(tuple, schema))); } - return buildRow(schema, keyTuple0, valTuple0); + return buildRow(false, valuesWithStatistics); } catch (Exception ex) { throw new TupleMarshallerException("Failed to marshal tuple.", ex); } @@ -105,153 +103,102 @@ public class TupleMarshallerImpl implements TupleMarshaller { @Override public Row marshal(Tuple keyTuple, @Nullable Tuple valTuple) throws TupleMarshallerException { try { - InternalTuple keyTuple0 = toInternalTuple(schema, keyTuple, true); - InternalTuple valTuple0 = toInternalTuple(schema, valTuple, false); + ValuesWithStatistics valuesWithStatistics = new ValuesWithStatistics(); + + gatherStatistics(keyColumns(), keyTuple, valuesWithStatistics); - if (keyTuple0.knownColumns() != keyTuple.columnCount()) { + if (valuesWithStatistics.knownColumns != keyTuple.columnCount()) { throw new SchemaMismatchException( String.format("Key tuple doesn't match schema: schemaVersion=%s, extraColumns=%s", schema.version(), extraColumnNames(keyTuple, true, schema))); } - if (valTuple != null && valTuple0.knownColumns() != valTuple.columnCount()) { - throw new SchemaMismatchException( - String.format("Value tuple doesn't match schema: schemaVersion=%s, extraColumns=%s", - schema.version(), extraColumnNames(valTuple, false, schema))); + boolean keyOnly = valTuple == null; + if (!keyOnly) { + gatherStatistics(valueColumns(), valTuple, valuesWithStatistics); + + if ((valuesWithStatistics.knownColumns - keyTuple.columnCount()) != valTuple.columnCount()) { + throw new SchemaMismatchException( + String.format("Value tuple doesn't match schema: schemaVersion=%s, extraColumns=%s", + schema.version(), extraColumnNames(valTuple, false, schema))); + } } - return buildRow(schema, keyTuple0, valTuple0); + return buildRow(keyOnly, valuesWithStatistics); } catch (Exception ex) { throw new TupleMarshallerException("Failed to marshal tuple.", ex); } } - /** - * Marshal tuple to a row. - * - * @param schema Schema. - * @param keyTuple0 Internal key tuple. - * @param valTuple0 Internal value tuple. - * @return Row. - * @throws SchemaMismatchException If failed to write tuple column. - */ - private Row buildRow(SchemaDescriptor schema, InternalTuple keyTuple0, InternalTuple valTuple0) throws SchemaMismatchException { - RowAssembler rowBuilder = createAssembler(schema, keyTuple0, valTuple0); - - Columns keyColumns = schema.keyColumns(); - - for (int i = 0, len = keyColumns.length(); i < len; i++) { - Column col = keyColumns.column(i); - - writeColumn(rowBuilder, col, keyTuple0); - } - - if (schema.valueColumns().length() == 0 || valTuple0.tuple == null) { - return Row.wrapKeyOnlyBinaryRow(schema, rowBuilder.build()); - } - - Columns valueColumns = schema.valueColumns(); - - for (int i = 0, len = valueColumns.length(); i < len; i++) { - Column col = valueColumns.column(i); - - writeColumn(rowBuilder, col, valTuple0); - } - - return Row.wrapBinaryRow(schema, rowBuilder.build()); - } - /** {@inheritDoc} */ @Override public Row marshalKey(Tuple keyTuple) throws TupleMarshallerException { try { - InternalTuple keyTuple0 = toInternalTuple(schema, keyTuple, true); + ValuesWithStatistics valuesWithStatistics = new ValuesWithStatistics(); - if (keyTuple0.knownColumns() < keyTuple.columnCount()) { - throw new SchemaMismatchException("Key tuple contains extra columns: " + extraColumnNames(keyTuple, true, schema)); - } + gatherStatistics(keyColumns(), keyTuple, valuesWithStatistics); - final RowAssembler rowBuilder = createAssembler(schema, keyTuple0, InternalTuple.NO_VALUE); - - Columns cols = schema.keyColumns(); - - for (int i = 0, len = cols.length(); i < len; i++) { - final Column col = cols.column(i); - - writeColumn(rowBuilder, col, keyTuple0); + if (valuesWithStatistics.knownColumns < keyTuple.columnCount()) { + throw new SchemaMismatchException("Key tuple contains extra columns: " + extraColumnNames(keyTuple, true, schema)); } - return Row.wrapKeyOnlyBinaryRow(schema, rowBuilder.build()); + return buildRow(true, valuesWithStatistics); } catch (Exception ex) { throw new TupleMarshallerException("Failed to marshal tuple.", ex); } } - /** - * Analyze tuple and wrap into internal tuple. - * - * @param schema Schema. - * @param tuple Key or value tuple. - * @param keyFlag If {@code true} marshal key columns, otherwise marshall value columns. - * @return Internal tuple. - * @throws SchemaMismatchException If tuple doesn't match the schema. - */ - private InternalTuple toInternalTuple(SchemaDescriptor schema, Tuple tuple, boolean keyFlag) throws SchemaMismatchException { - if (tuple == null) { - return InternalTuple.NO_VALUE; + private List<Column> keyColumns() { + return List.of(schema.keyColumns().columns()); + } + + private List<Column> valueColumns() { + return List.of(schema.valueColumns().columns()); + } + + private Row buildRow( + boolean keyOnly, + ValuesWithStatistics values + ) throws SchemaMismatchException { + List<Column> columns = keyOnly ? keyColumns() : schema.columns(); + RowAssembler rowBuilder = new RowAssembler(schema.version(), columns, values.estimatedValueSize); + + for (Column col : columns) { + rowBuilder.appendValue(values.value(col.name())); } - Columns columns = keyFlag ? schema.keyColumns() : schema.valueColumns(); + return keyOnly + ? Row.wrapKeyOnlyBinaryRow(schema, rowBuilder.build()) + : Row.wrapBinaryRow(schema, rowBuilder.build()); + } - boolean hasNulls = false; + private void gatherStatistics( + List<Column> columns, + Tuple tuple, + ValuesWithStatistics targetTuple + ) throws SchemaMismatchException { int estimatedValueSize = 0; int knownColumns = 0; - Map<String, Object> defaults = new HashMap<>(); - - if (tuple instanceof SchemaAware && Objects.equals(((SchemaAware) tuple).schema(), schema)) { - for (int i = 0, len = columns.length(); i < len; i++) { - final Column col = columns.column(i); - NativeType colType = col.type(); + for (Column col : columns) { + NativeType colType = col.type(); - Object val = tuple.valueOrDefault(col.name(), POISON_OBJECT); - col.validate(val); + Object val = tuple.valueOrDefault(col.name(), POISON_OBJECT); - assert val != POISON_OBJECT; + if (val == POISON_OBJECT && schema.isKeyColumn(col.schemaIndex())) { + throw new SchemaMismatchException("Missed key column: " + col.name()); + } + if (val == POISON_OBJECT) { + val = col.defaultValue(); + } else { knownColumns++; - - if (val == null) { - hasNulls = true; - } else if (colType.spec().fixedLength()) { - estimatedValueSize += colType.sizeInBytes(); - } else { - estimatedValueSize += getValueSize(val, colType); - } } - } else { - for (int i = 0, len = columns.length(); i < len; i++) { - final Column col = columns.column(i); - NativeType colType = col.type(); - Object val = tuple.valueOrDefault(col.name(), POISON_OBJECT); - - if (val == POISON_OBJECT) { - if (keyFlag) { - throw new SchemaMismatchException("Missed key column: " + col.name()); - } - - val = col.defaultValue(); - - defaults.put(col.name(), val); - } else { - knownColumns++; - } - - col.validate(val); + col.validate(val); + targetTuple.values.put(col.name(), val); - if (val == null) { - hasNulls = true; - } else if (colType.spec().fixedLength()) { + if (val != null) { + if (colType.spec().fixedLength()) { estimatedValueSize += colType.sizeInBytes(); } else { estimatedValueSize += getValueSize(val, colType); @@ -259,7 +206,8 @@ public class TupleMarshallerImpl implements TupleMarshaller { } } - return new InternalTuple(tuple, hasNulls, estimatedValueSize, defaults, knownColumns); + targetTuple.estimatedValueSize += estimatedValueSize; + targetTuple.knownColumns += knownColumns; } /** @@ -269,7 +217,7 @@ public class TupleMarshallerImpl implements TupleMarshaller { * @param schema Schema. * @return Extra columns. */ - private Set<String> extraColumnNames(Tuple tuple, SchemaDescriptor schema) { + private static Set<String> extraColumnNames(Tuple tuple, SchemaDescriptor schema) { Set<String> cols = new HashSet<>(); for (int i = 0, len = tuple.columnCount(); i < len; i++) { @@ -291,7 +239,7 @@ public class TupleMarshallerImpl implements TupleMarshaller { * @param schema Schema to check against. * @return Column names. */ - private Set<String> extraColumnNames(Tuple tuple, boolean keyTuple, SchemaDescriptor schema) { + private static Set<String> extraColumnNames(Tuple tuple, boolean keyTuple, SchemaDescriptor schema) { Set<String> cols = new HashSet<>(); for (int i = 0, len = tuple.columnCount(); i < len; i++) { @@ -307,39 +255,6 @@ public class TupleMarshallerImpl implements TupleMarshaller { return cols; } - /** - * Creates {@link RowAssembler} for key-value tuples. - * - * @param schema Schema. - * @param keyTuple Internal key tuple. - * @param valTuple Internal value tuple. - * @return Row assembler. - */ - private static RowAssembler createAssembler(SchemaDescriptor schema, InternalTuple keyTuple, InternalTuple valTuple) { - Columns valueColumns = valTuple.tuple != null ? schema.valueColumns() : null; - - int totalValueSize; - if (keyTuple.estimatedValueSize < 0 || valTuple.estimatedValueSize < 0) { - totalValueSize = -1; - } else { - totalValueSize = keyTuple.estimatedValueSize + valTuple.estimatedValueSize; - } - - return new RowAssembler(schema.keyColumns(), valueColumns, schema.version(), totalValueSize); - } - - /** - * Writes column. - * - * @param rowAsm Row assembler. - * @param col Column. - * @param tup Internal tuple. - * @throws SchemaMismatchException If a tuple column value doesn't match the current column type. - */ - private static void writeColumn(RowAssembler rowAsm, Column col, InternalTuple tup) throws SchemaMismatchException { - rowAsm.appendValue(tup.value(col.name())); - } - /** * Determines whether binary tuple rebuild is required. * @@ -367,64 +282,17 @@ public class TupleMarshallerImpl implements TupleMarshaller { } /** - * Internal tuple enriched original tuple with additional info. + * Container to keep columns values and related statistics which help + * to build row with {@link RowAssembler}. */ - private static class InternalTuple { - /** Cached zero statistics. */ - static final InternalTuple NO_VALUE = new InternalTuple(null, false, 0, null, 0); - - /** Original tuple. */ - private final Tuple tuple; - - /** Whether there are NULL values or not. */ - private final boolean hasNulls; - - /** Estimated total size of the object. */ - private final int estimatedValueSize; - - /** Pre-calculated defaults. */ - private final Map<String, Object> defaults; - - /** Schema columns in tuple. */ - private final int knownColumns; - - /** - * Creates internal tuple. - * - * @param tuple Tuple. - * @param hasNulls Whether there are NULL values or not.. - * @param estimatedValueSize Estimated total size of the object. - * @param defaults Default values map. - * @param knownColumns Number of columns that match schema. - */ - InternalTuple(Tuple tuple, boolean hasNulls, int estimatedValueSize, Map<String, Object> defaults, int knownColumns) { - this.hasNulls = hasNulls; - this.estimatedValueSize = estimatedValueSize; - this.tuple = tuple; - this.defaults = defaults; - this.knownColumns = knownColumns; - } - - /** - * Returns number of columns that matches schema. - */ - public int knownColumns() { - return knownColumns; - } - - /** - * Returns column value. - * - * @param columnName Columns name. - */ - Object value(String columnName) { - Object val = tuple.valueOrDefault(columnName, POISON_OBJECT); + private static class ValuesWithStatistics { + private final Map<String, Object> values = new HashMap<>(); - if (val == POISON_OBJECT) { - return defaults.get(columnName); - } + private int estimatedValueSize; + private int knownColumns; - return val; + @Nullable Object value(String columnName) { + return values.get(columnName); } } } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/ColocationHashCalculationTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/ColocationHashCalculationTest.java index 780711e386..e7a528702e 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/ColocationHashCalculationTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/ColocationHashCalculationTest.java @@ -73,7 +73,7 @@ public class ColocationHashCalculationTest { }, new Column[]{new Column(3, "val", INT32, true).copy(3)}); - RowAssembler rasm = new RowAssembler(schema); + RowAssembler rasm = new RowAssembler(schema, -1); rasm.appendByte((byte) 1); rasm.appendInt(2); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java index 8781f13a23..705ed4eb9f 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java @@ -925,7 +925,7 @@ public class PartitionCommandListenerTest extends BaseIgniteAbstractTest { * @return Row. */ private BinaryRowMessage getTestRow(int key, int val) { - RowAssembler rowBuilder = new RowAssembler(SCHEMA); + RowAssembler rowBuilder = new RowAssembler(SCHEMA, -1); rowBuilder.appendInt(key); rowBuilder.appendInt(val); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java index 15d9e33141..133d39d895 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java @@ -416,7 +416,7 @@ public class IncomingSnapshotCopierTest extends BaseIgniteAbstractTest { } private static BinaryRow createRow(String key, String value) { - return new RowAssembler(SCHEMA_DESCRIPTOR) + return new RowAssembler(SCHEMA_DESCRIPTOR, -1) .appendStringNotNull(key) .appendStringNotNull(value) .build();