This is an automated email from the ASF dual-hosted git repository. korlov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 8314beacd1 IGNITE-18900 Sql. Remove hash function creation from ExecutionContext (#1756) 8314beacd1 is described below commit 8314beacd11ef54690d2ca350a8a043bbf21e262 Author: korlov42 <kor...@gridgain.com> AuthorDate: Tue Mar 7 11:42:24 2023 +0200 IGNITE-18900 Sql. Remove hash function creation from ExecutionContext (#1756) --- .../ignite/client/fakes/FakeInternalTable.java | 6 ++ .../internal/sql/engine/exec/ExecutionContext.java | 65 +-------------------- .../sql/engine/exec/ExecutionServiceImpl.java | 3 +- .../sql/engine/schema/IgniteTableImpl.java | 66 +++++++++------------- .../sql/engine/exec/RuntimeSortedIndexTest.java | 1 - .../sql/engine/exec/rel/AbstractExecutionTest.java | 3 +- .../engine/exec/rel/MergeJoinExecutionTest.java | 2 +- .../sql/engine/framework/TestBuilders.java | 3 +- .../ignite/internal/table/InternalTable.java | 8 +++ .../distributed/storage/InternalTableImpl.java | 27 ++++----- 10 files changed, 56 insertions(+), 128 deletions(-) diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java index 50036afd99..ba232b6580 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java +++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java @@ -100,6 +100,12 @@ public class FakeInternalTable implements InternalTable { return tableName; } + /** {@inheritDoc} */ + @Override + public int partitionId(BinaryRowEx row) { + return 0; + } + /** {@inheritDoc} */ @Override public CompletableFuture<BinaryRow> get(BinaryRowEx keyRow, @Nullable InternalTransaction tx) { diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java index 053c921b97..3a8770f1ea 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java @@ -20,8 +20,6 @@ package org.apache.ignite.internal.sql.engine.exec; import static org.apache.ignite.lang.ErrorGroups.Common.UNEXPECTED_ERR; import java.lang.reflect.Type; -import java.util.Arrays; -import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -44,8 +42,6 @@ import org.apache.ignite.internal.sql.engine.metadata.FragmentDescription; import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory; import org.apache.ignite.internal.sql.engine.util.AbstractQueryContext; import org.apache.ignite.internal.sql.engine.util.BaseQueryContext; -import org.apache.ignite.internal.sql.engine.util.HashFunctionFactory; -import org.apache.ignite.internal.sql.engine.util.HashFunctionFactory.RowHashFunction; import org.apache.ignite.internal.sql.engine.util.TypeUtils; import org.apache.ignite.lang.IgniteInternalException; import org.apache.ignite.network.ClusterNode; @@ -82,10 +78,7 @@ public class ExecutionContext<RowT> extends AbstractQueryContext implements Data private final ExpressionFactory<RowT> expressionFactory; - private final HashFunctionFactory<RowT> hashFunctionFactory; - private final AtomicBoolean cancelFlag = new AtomicBoolean(); - private final Map<HashFunctionCacheKey, RowHashFunction<RowT>> hashFunctionCache = new HashMap<>(); /** * Need to store timestamp, since SQL standard says that functions such as CURRENT_TIMESTAMP return the same value throughout the @@ -117,8 +110,7 @@ public class ExecutionContext<RowT> extends AbstractQueryContext implements Data FragmentDescription fragmentDesc, RowHandler<RowT> handler, Map<String, Object> params, - TxAttributes txAttributes, - HashFunctionFactory<RowT> hashFunctionFactory + TxAttributes txAttributes ) { super(qctx); @@ -130,7 +122,6 @@ public class ExecutionContext<RowT> extends AbstractQueryContext implements Data this.params = params; this.localNode = localNode; this.originatingNodeName = originatingNodeName; - this.hashFunctionFactory = hashFunctionFactory; this.txAttributes = txAttributes; expressionFactory = new ExpressionFactoryImpl<>( @@ -221,28 +212,6 @@ public class ExecutionContext<RowT> extends AbstractQueryContext implements Data return localNode; } - /** - * Returns the function to compute colocation hash for specified table. - * - * @param tableId An identifier of a table. This identifier will be used to acquire expected types - * of the colocation fields. - * @param fields Indexes of the fields representing colocation columns. This is a projection of - * the colocation fields of specified table on actual row. For example, type of the row to insert - * equals to the type of table's row, thus passed fields should match the colocation columns of the table. - * But row for delete may contain the primary fields only, thus we need to project colocation fields - * on this trimmed row. - * @return A hash function. - */ - // this is more like workaround to limit scope of the refactoring, - // but it definitely need to be fixed - // TODO: https://issues.apache.org/jira/browse/IGNITE-18900 - public RowHashFunction<RowT> hashFunction(UUID tableId, int[] fields) { - return hashFunctionCache.computeIfAbsent( - new HashFunctionCacheKey(tableId, fields), - k -> hashFunctionFactory.create(fields, tableId) - ); - } - /** {@inheritDoc} */ @Override public SchemaPlus getRootSchema() { @@ -431,36 +400,4 @@ public class ExecutionContext<RowT> extends AbstractQueryContext implements Data public int hashCode() { return Objects.hash(qryId, fragmentDesc.fragmentId()); } - - private static class HashFunctionCacheKey { - private final UUID tableId; - private final int[] fields; - - private HashFunctionCacheKey(UUID tableId, int[] fields) { - this.tableId = tableId; - this.fields = fields; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - HashFunctionCacheKey that = (HashFunctionCacheKey) o; - - if (!tableId.equals(that.tableId)) { - return false; - } - return Arrays.equals(fields, that.fields); - } - - @Override - public int hashCode() { - return tableId.hashCode(); - } - } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java index edc278be88..817259d19f 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java @@ -550,8 +550,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve desc, handler, Commons.parametersMap(ctx.parameters()), - txAttributes, - new HashFunctionFactoryImpl<>(sqlSchemaManager, handler) + txAttributes ); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java index d019eddf6f..f77e9d4e70 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java @@ -30,7 +30,6 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.ToIntFunction; import java.util.stream.Collectors; import org.apache.calcite.plan.Convention; import org.apache.calcite.plan.RelOptCluster; @@ -53,6 +52,7 @@ import org.apache.ignite.internal.replicator.ReplicaService; import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.replicator.message.ReplicaRequest; import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.schema.BinaryRowEx; import org.apache.ignite.internal.schema.NativeTypeSpec; import org.apache.ignite.internal.schema.SchemaDescriptor; import org.apache.ignite.internal.schema.SchemaRegistry; @@ -72,14 +72,12 @@ import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Type; import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution; import org.apache.ignite.internal.sql.engine.trait.TraitUtils; import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory; -import org.apache.ignite.internal.sql.engine.util.HashFunctionFactory.RowHashFunction; import org.apache.ignite.internal.sql.engine.util.TypeUtils; import org.apache.ignite.internal.storage.MvPartitionStorage; import org.apache.ignite.internal.table.InternalTable; import org.apache.ignite.internal.table.distributed.TableMessagesFactory; import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId; import org.apache.ignite.internal.table.distributed.replicator.action.RequestType; -import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.lang.ErrorGroups; import org.apache.ignite.sql.SqlException; import org.jetbrains.annotations.Nullable; @@ -110,8 +108,7 @@ public class IgniteTableImpl extends AbstractTable implements IgniteTable, Updat private final List<ColumnDescriptor> columnsOrderedByPhysSchema; - private final int[] deleteRowHashFields; - private final int[] upsertRowHashFields; + private final PartitionExtractor partitionExtractor; /** * Constructor. @@ -133,28 +130,19 @@ public class IgniteTableImpl extends AbstractTable implements IgniteTable, Updat this.clock = clock; this.schemaRegistry = schemaRegistry; this.schemaDescriptor = schemaRegistry.schema(); + this.partitionExtractor = table::partitionId; assert schemaDescriptor != null; - BitSet keyFields = new BitSet(); List<ColumnDescriptor> tmp = new ArrayList<>(desc.columnsCount()); for (int i = 0; i < desc.columnsCount(); i++) { - ColumnDescriptor descriptor = desc.columnDescriptor(i); - - tmp.add(descriptor); - - if (descriptor.key()) { - keyFields.set(descriptor.logicalIndex()); - } + tmp.add(desc.columnDescriptor(i)); } tmp.sort(Comparator.comparingInt(ColumnDescriptor::physicalIndex)); columnsOrderedByPhysSchema = tmp; - upsertRowHashFields = desc.distribution().getKeys().toIntArray(); - deleteRowHashFields = project(desc.columnsCount(), upsertRowHashFields, keyFields); - statistic = new StatisticsImpl(); } @@ -168,8 +156,7 @@ public class IgniteTableImpl extends AbstractTable implements IgniteTable, Updat this.schemaDescriptor = t.schemaDescriptor; this.statistic = t.statistic; this.columnsOrderedByPhysSchema = t.columnsOrderedByPhysSchema; - this.upsertRowHashFields = t.upsertRowHashFields; - this.deleteRowHashFields = t.deleteRowHashFields; + this.partitionExtractor = t.partitionExtractor; this.indexes.putAll(t.indexes); } @@ -349,23 +336,20 @@ public class IgniteTableImpl extends AbstractTable implements IgniteTable, Updat assert commitPartitionId != null; UUID tableId = table.tableId(); - RowHashFunction<RowT> hashFunction = ectx.hashFunction(tableId, upsertRowHashFields); - ToIntFunction<RowT> partitionExtractor = row -> IgniteUtils.safeAbs(hashFunction.hashOf(row) % table.partitions()); - - Int2ObjectOpenHashMap<List<BinaryRow>> keyRowsByPartition = new Int2ObjectOpenHashMap<>(); + Int2ObjectOpenHashMap<List<BinaryRow>> rowsByPartition = new Int2ObjectOpenHashMap<>(); for (RowT row : rows) { - BinaryRow binaryRow = convertRow(row, ectx, false); + BinaryRowEx binaryRow = convertRow(row, ectx, false); - keyRowsByPartition.computeIfAbsent(partitionExtractor.applyAsInt(row), k -> new ArrayList<>()).add(binaryRow); + rowsByPartition.computeIfAbsent(partitionExtractor.fromRow(binaryRow), k -> new ArrayList<>()).add(binaryRow); } - CompletableFuture<List<RowT>>[] futures = new CompletableFuture[keyRowsByPartition.size()]; + CompletableFuture<List<RowT>>[] futures = new CompletableFuture[rowsByPartition.size()]; int batchNum = 0; - for (Int2ObjectMap.Entry<List<BinaryRow>> partToRows : keyRowsByPartition.int2ObjectEntrySet()) { + for (Int2ObjectMap.Entry<List<BinaryRow>> partToRows : rowsByPartition.int2ObjectEntrySet()) { TablePartitionId partGroupId = new TablePartitionId(tableId, partToRows.getIntKey()); NodeWithTerm nodeWithTerm = ectx.description().mapping().updatingTableAssignments().get(partToRows.getIntKey()); @@ -399,23 +383,20 @@ public class IgniteTableImpl extends AbstractTable implements IgniteTable, Updat RowHandler<RowT> handler = ectx.rowHandler(); UUID tableId = table.tableId(); - RowHashFunction<RowT> hashFunction = ectx.hashFunction(tableId, upsertRowHashFields); - ToIntFunction<RowT> partitionExtractor = row -> IgniteUtils.safeAbs(hashFunction.hashOf(row) % table.partitions()); - - Int2ObjectOpenHashMap<List<BinaryRow>> keyRowsByPartition = new Int2ObjectOpenHashMap<>(); + Int2ObjectOpenHashMap<List<BinaryRow>> rowsByPartition = new Int2ObjectOpenHashMap<>(); for (RowT row : rows) { - BinaryRow binaryRow = convertRow(row, ectx, false); + BinaryRowEx binaryRow = convertRow(row, ectx, false); - keyRowsByPartition.computeIfAbsent(partitionExtractor.applyAsInt(row), k -> new ArrayList<>()).add(binaryRow); + rowsByPartition.computeIfAbsent(partitionExtractor.fromRow(binaryRow), k -> new ArrayList<>()).add(binaryRow); } - CompletableFuture<List<RowT>>[] futures = new CompletableFuture[keyRowsByPartition.size()]; + CompletableFuture<List<RowT>>[] futures = new CompletableFuture[rowsByPartition.size()]; int batchNum = 0; - for (Int2ObjectMap.Entry<List<BinaryRow>> partToRows : keyRowsByPartition.int2ObjectEntrySet()) { + for (Int2ObjectMap.Entry<List<BinaryRow>> partToRows : rowsByPartition.int2ObjectEntrySet()) { TablePartitionId partGroupId = new TablePartitionId(tableId, partToRows.getIntKey()); NodeWithTerm nodeWithTerm = ectx.description().mapping().updatingTableAssignments().get(partToRows.getIntKey()); @@ -467,16 +448,13 @@ public class IgniteTableImpl extends AbstractTable implements IgniteTable, Updat assert commitPartitionId != null; UUID tableId = table.tableId(); - RowHashFunction<RowT> hashFunction = ectx.hashFunction(tableId, deleteRowHashFields); - - ToIntFunction<RowT> partitionExtractor = row -> IgniteUtils.safeAbs(hashFunction.hashOf(row) % table.partitions()); Int2ObjectOpenHashMap<List<BinaryRow>> keyRowsByPartition = new Int2ObjectOpenHashMap<>(); for (RowT row : rows) { - BinaryRow binaryRow = convertRow(row, ectx, true); + BinaryRowEx binaryRow = convertRow(row, ectx, true); - keyRowsByPartition.computeIfAbsent(partitionExtractor.applyAsInt(row), k -> new ArrayList<>()).add(binaryRow); + keyRowsByPartition.computeIfAbsent(partitionExtractor.fromRow(binaryRow), k -> new ArrayList<>()).add(binaryRow); } CompletableFuture<List<RowT>>[] futures = new CompletableFuture[keyRowsByPartition.size()]; @@ -503,7 +481,7 @@ public class IgniteTableImpl extends AbstractTable implements IgniteTable, Updat return CompletableFuture.allOf(futures); } - private <RowT> BinaryRow convertRow(RowT row, ExecutionContext<RowT> ectx, boolean keyOnly) { + private <RowT> BinaryRowEx convertRow(RowT row, ExecutionContext<RowT> ectx, boolean keyOnly) { RowHandler<RowT> hnd = ectx.rowHandler(); boolean hasNulls = false; @@ -659,4 +637,12 @@ public class IgniteTableImpl extends AbstractTable implements IgniteTable, Updat return new SqlException(ErrorGroups.Sql.DUPLICATE_KEYS_ERR, "PK unique constraint is violated"); } + + /** + * Extracts an identifier of partition from a given row. + */ + @FunctionalInterface + private interface PartitionExtractor { + int fromRow(BinaryRowEx row); + } } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java index 5768b4a207..af6d1daf2b 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java @@ -118,7 +118,6 @@ public class RuntimeSortedIndexTest extends IgniteAbstractTest { null, ArrayRowHandler.INSTANCE, Map.of(), - null, null ), RelCollations.of(ImmutableIntList.copyOf(idxCols)), diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java index 35ebd03a82..8b1e7bc413 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java @@ -111,8 +111,7 @@ public class AbstractExecutionTest extends IgniteAbstractTest { fragmentDesc, ArrayRowHandler.INSTANCE, Map.of(), - TxAttributes.fromTx(new NoOpTransaction("fake-test-node")), - null + TxAttributes.fromTx(new NoOpTransaction("fake-test-node")) ); } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java index ac78a05703..ee4680ed19 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java @@ -488,7 +488,7 @@ public class MergeJoinExecutionTest extends AbstractExecutionTest { ExecutionContext<Object[]> ectx = new ExecutionContext<>(BaseQueryContext.builder().logger(log).build(), null, null, null, - null, null, ArrayRowHandler.INSTANCE, null, null, null); + null, null, ArrayRowHandler.INSTANCE, null, null); ExpressionFactoryImpl<Object[]> expFactory = new ExpressionFactoryImpl<>(ectx, typeFactory, SqlConformanceEnum.DEFAULT); diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java index 3d8e552231..868c712cbf 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java @@ -219,8 +219,7 @@ public class TestBuilders { description, ArrayRowHandler.INSTANCE, Map.of(), - TxAttributes.fromTx(new NoOpTransaction(node.name())), - null + TxAttributes.fromTx(new NoOpTransaction(node.name())) ); } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java index e7b176636a..8b97f5d05a 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java @@ -66,6 +66,14 @@ public interface InternalTable extends ManuallyCloseable { */ String name(); + /** + * Extracts an identifier of a partition from a given row. + * + * @param row A row to extract partition from. + * @return An identifier of a partition the row belongs to. + */ + int partitionId(BinaryRowEx row); + /** * Asynchronously gets a row with same key columns values as given one from the table. * diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java index 61b9d5cad0..deb430c1e8 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java @@ -77,6 +77,7 @@ import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.tx.LockException; import org.apache.ignite.internal.tx.TxManager; import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage; +import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.utils.PrimaryReplica; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteException; @@ -227,7 +228,7 @@ public class InternalTableImpl implements InternalTable { final InternalTransaction tx0 = implicit ? txManager.begin() : tx; - int partId = partId(row); + int partId = partitionId(row); TablePartitionId partGroupId = new TablePartitionId(tableId, partId); @@ -497,7 +498,7 @@ public class InternalTableImpl implements InternalTable { @Override public CompletableFuture<BinaryRow> get(BinaryRowEx keyRow, InternalTransaction tx) { if (tx != null && tx.isReadOnly()) { - return evaluateReadOnlyRecipientNode(partId(keyRow)) + return evaluateReadOnlyRecipientNode(partitionId(keyRow)) .thenCompose(recipientNode -> get(keyRow, tx.readTimestamp(), recipientNode)); } else { return enlistInTx( @@ -522,7 +523,7 @@ public class InternalTableImpl implements InternalTable { @NotNull HybridTimestamp readTimestamp, @NotNull ClusterNode recipientNode ) { - int partId = partId(keyRow); + int partId = partitionId(keyRow); ReplicationGroupId partGroupId = partitionMap.get(partId).groupId(); return replicaSvc.invoke(recipientNode, tableMessagesFactory.readOnlySingleRowReplicaRequest() @@ -543,7 +544,7 @@ public class InternalTableImpl implements InternalTable { if (firstRow == null) { return CompletableFuture.completedFuture(Collections.emptyList()); } else { - return evaluateReadOnlyRecipientNode(partId(firstRow)) + return evaluateReadOnlyRecipientNode(partitionId(firstRow)) .thenCompose(recipientNode -> getAll(keyRows, tx.readTimestamp(), recipientNode)); } } else { @@ -1049,7 +1050,7 @@ public class InternalTableImpl implements InternalTable { Int2ObjectOpenHashMap<List<BinaryRow>> keyRowsByPartition = new Int2ObjectOpenHashMap<>(); for (BinaryRowEx keyRow : rows) { - keyRowsByPartition.computeIfAbsent(partId(keyRow), k -> new ArrayList<>()).add(keyRow); + keyRowsByPartition.computeIfAbsent(partitionId(keyRow), k -> new ArrayList<>()).add(keyRow); } return keyRowsByPartition; @@ -1141,7 +1142,7 @@ public class InternalTableImpl implements InternalTable { @TestOnly @Override public int partition(BinaryRowEx keyRow) { - return partId(keyRow); + return partitionId(keyRow); } /** @@ -1162,16 +1163,10 @@ public class InternalTableImpl implements InternalTable { })); } - /** - * Get partition id by key row. - * - * @param row Key row. - * @return partition id. - */ - private int partId(BinaryRowEx row) { - int partId = row.colocationHash() % partitions; - - return (partId < 0) ? -partId : partId; + /** {@inheritDoc} */ + @Override + public int partitionId(BinaryRowEx row) { + return IgniteUtils.safeAbs(row.colocationHash()) % partitions; } /**